1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 """
21 The proton module defines a suite of APIs that implement the AMQP 1.0
22 protocol.
23
24 The proton APIs consist of the following classes:
25
26 - L{Messenger} -- A messaging endpoint.
27 - L{Message} -- A class for creating and/or accessing AMQP message content.
28 - L{Data} -- A class for creating and/or accessing arbitrary AMQP encoded
29 data.
30
31 """
32
33 from cproton import *
34 from wrapper import Wrapper
35
36 import weakref, socket, sys, threading
37 try:
38 import uuid
42
43 except ImportError:
44 """
45 No 'native' UUID support. Provide a very basic UUID type that is a compatible subset of the uuid type provided by more modern python releases.
46 """
47 import struct
50 - def __init__(self, hex=None, bytes=None):
51 if [hex, bytes].count(None) != 1:
52 raise TypeError("need one of hex or bytes")
53 if bytes is not None:
54 self.bytes = bytes
55 elif hex is not None:
56 fields=hex.split("-")
57 fields[4:5] = [fields[4][:4], fields[4][4:]]
58 self.bytes = struct.pack("!LHHHHL", *[int(x,16) for x in fields])
59
61 if isinstance(other, uuid.UUID):
62 return cmp(self.bytes, other.bytes)
63 else:
64 return -1
65
67 return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", self.bytes)
68
70 return "UUID(%r)" % str(self)
71
74
75 import os, random, time
76 rand = random.Random()
77 rand.seed((os.getpid(), time.time(), socket.gethostname()))
79 bytes = [rand.randint(0, 255) for i in xrange(16)]
80
81
82 bytes[7] &= 0x0F
83 bytes[7] |= 0x40
84
85
86 bytes[8] &= 0x3F
87 bytes[8] |= 0x80
88 return "".join(map(chr, bytes))
89
91 return uuid.UUID(bytes=random_uuid())
92
95
96 try:
97 bytes()
98 except NameError:
99 bytes = str
100
101 VERSION_MAJOR = PN_VERSION_MAJOR
102 VERSION_MINOR = PN_VERSION_MINOR
103 API_LANGUAGE = "C"
104 IMPLEMENTATION_LANGUAGE = "C"
113
115 """
116 The root of the proton exception hierarchy. All proton exception
117 classes derive from this exception.
118 """
119 pass
120
122 """
123 A timeout exception indicates that a blocking operation has timed
124 out.
125 """
126 pass
127
129 """
130 An interrupt exception indicaes that a blocking operation was interrupted.
131 """
132 pass
133
135 """
136 The root of the messenger exception hierarchy. All exceptions
137 generated by the messenger class derive from this exception.
138 """
139 pass
140
142 """
143 The MessageException class is the root of the message exception
144 hierarhcy. All exceptions generated by the Message class derive from
145 this exception.
146 """
147 pass
148
149 EXCEPTIONS = {
150 PN_TIMEOUT: Timeout,
151 PN_INTR: Interrupt
152 }
153
154 PENDING = Constant("PENDING")
155 ACCEPTED = Constant("ACCEPTED")
156 REJECTED = Constant("REJECTED")
157 RELEASED = Constant("RELEASED")
158 MODIFIED = Constant("MODIFIED")
159 ABORTED = Constant("ABORTED")
160 SETTLED = Constant("SETTLED")
161
162 STATUSES = {
163 PN_STATUS_ABORTED: ABORTED,
164 PN_STATUS_ACCEPTED: ACCEPTED,
165 PN_STATUS_REJECTED: REJECTED,
166 PN_STATUS_RELEASED: RELEASED,
167 PN_STATUS_MODIFIED: MODIFIED,
168 PN_STATUS_PENDING: PENDING,
169 PN_STATUS_SETTLED: SETTLED,
170 PN_STATUS_UNKNOWN: None
171 }
172
173 AUTOMATIC = Constant("AUTOMATIC")
174 MANUAL = Constant("MANUAL")
177 """
178 The L{Messenger} class defines a high level interface for sending
179 and receiving L{Messages<Message>}. Every L{Messenger} contains a
180 single logical queue of incoming messages and a single logical queue
181 of outgoing messages. These messages in these queues may be destined
182 for, or originate from, a variety of addresses.
183
184 The messenger interface is single-threaded. All methods
185 except one (L{interrupt}) are intended to be used from within
186 the messenger thread.
187
188
189 Address Syntax
190 ==============
191
192 An address has the following form::
193
194 [ amqp[s]:// ] [user[:password]@] domain [/[name]]
195
196 Where domain can be one of::
197
198 host | host:port | ip | ip:port | name
199
200 The following are valid examples of addresses:
201
202 - example.org
203 - example.org:1234
204 - amqp://example.org
205 - amqps://example.org
206 - example.org/incoming
207 - amqps://example.org/outgoing
208 - amqps://fred:trustno1@example.org
209 - 127.0.0.1:1234
210 - amqps://127.0.0.1:1234
211
212 Sending & Receiving Messages
213 ============================
214
215 The L{Messenger} class works in conjuction with the L{Message} class. The
216 L{Message} class is a mutable holder of message content.
217
218 The L{put} method copies its L{Message} to the outgoing queue, and may
219 send queued messages if it can do so without blocking. The L{send}
220 method blocks until it has sent the requested number of messages,
221 or until a timeout interrupts the attempt.
222
223
224 >>> message = Message()
225 >>> for i in range(3):
226 ... message.address = "amqp://host/queue"
227 ... message.subject = "Hello World %i" % i
228 ... messenger.put(message)
229 >>> messenger.send()
230
231 Similarly, the L{recv} method receives messages into the incoming
232 queue, and may block as it attempts to receive the requested number
233 of messages, or until timeout is reached. It may receive fewer
234 than the requested number. The L{get} method pops the
235 eldest L{Message} off the incoming queue and copies it into the L{Message}
236 object that you supply. It will not block.
237
238
239 >>> message = Message()
240 >>> messenger.recv(10):
241 >>> while messenger.incoming > 0:
242 ... messenger.get(message)
243 ... print message.subject
244 Hello World 0
245 Hello World 1
246 Hello World 2
247
248 The blocking flag allows you to turn off blocking behavior entirely,
249 in which case L{send} and L{recv} will do whatever they can without
250 blocking, and then return. You can then look at the number
251 of incoming and outgoing messages to see how much outstanding work
252 still remains.
253 """
254
256 """
257 Construct a new L{Messenger} with the given name. The name has
258 global scope. If a NULL name is supplied, a UUID based name will
259 be chosen.
260
261 @type name: string
262 @param name: the name of the messenger or None
263
264 """
265 self._mng = pn_messenger(name)
266 self._selectables = {}
267
269 """
270 Destroy the L{Messenger}. This will close all connections that
271 are managed by the L{Messenger}. Call the L{stop} method before
272 destroying the L{Messenger}.
273 """
274 if hasattr(self, "_mng"):
275 pn_messenger_free(self._mng)
276 del self._mng
277
279 if err < 0:
280 if (err == PN_INPROGRESS):
281 return
282 exc = EXCEPTIONS.get(err, MessengerException)
283 raise exc("[%s]: %s" % (err, pn_error_text(pn_messenger_error(self._mng))))
284 else:
285 return err
286
287 @property
289 """
290 The name of the L{Messenger}.
291 """
292 return pn_messenger_name(self._mng)
293
295 return pn_messenger_get_certificate(self._mng)
296
298 self._check(pn_messenger_set_certificate(self._mng, value))
299
300 certificate = property(_get_certificate, _set_certificate,
301 doc="""
302 Path to a certificate file for the L{Messenger}. This certificate is
303 used when the L{Messenger} accepts or establishes SSL/TLS connections.
304 This property must be specified for the L{Messenger} to accept
305 incoming SSL/TLS connections and to establish client authenticated
306 outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS
307 connections do not require this property.
308 """)
309
311 return pn_messenger_get_private_key(self._mng)
312
314 self._check(pn_messenger_set_private_key(self._mng, value))
315
316 private_key = property(_get_private_key, _set_private_key,
317 doc="""
318 Path to a private key file for the L{Messenger's<Messenger>}
319 certificate. This property must be specified for the L{Messenger} to
320 accept incoming SSL/TLS connections and to establish client
321 authenticated outgoing SSL/TLS connection. Non client authenticated
322 SSL/TLS connections do not require this property.
323 """)
324
326 return pn_messenger_get_password(self._mng)
327
329 self._check(pn_messenger_set_password(self._mng, value))
330
331 password = property(_get_password, _set_password,
332 doc="""
333 This property contains the password for the L{Messenger.private_key}
334 file, or None if the file is not encrypted.
335 """)
336
338 return pn_messenger_get_trusted_certificates(self._mng)
339
341 self._check(pn_messenger_set_trusted_certificates(self._mng, value))
342
343 trusted_certificates = property(_get_trusted_certificates,
344 _set_trusted_certificates,
345 doc="""
346 A path to a database of trusted certificates for use in verifying the
347 peer on an SSL/TLS connection. If this property is None, then the peer
348 will not be verified.
349 """)
350
352 t = pn_messenger_get_timeout(self._mng)
353 if t == -1:
354 return None
355 else:
356 return millis2secs(t)
357
359 if value is None:
360 t = -1
361 else:
362 t = secs2millis(value)
363 self._check(pn_messenger_set_timeout(self._mng, t))
364
365 timeout = property(_get_timeout, _set_timeout,
366 doc="""
367 The timeout property contains the default timeout for blocking
368 operations performed by the L{Messenger}.
369 """)
370
372 return pn_messenger_is_blocking(self._mng)
373
375 self._check(pn_messenger_set_blocking(self._mng, b))
376
377 blocking = property(_is_blocking, _set_blocking,
378 doc="""
379 Enable or disable blocking behavior during L{Message} sending
380 and receiving. This affects every blocking call, with the
381 exception of L{work}. Currently, the affected calls are
382 L{send}, L{recv}, and L{stop}.
383 """)
384
386 return pn_messenger_is_passive(self._mng)
387
389 self._check(pn_messenger_set_passive(self._mng, b))
390
391 passive = property(_is_passive, _set_passive,
392 doc="""
393 When passive is set to true, Messenger will not attempt to perform I/O
394 internally. In this mode it is necessary to use the selectables API to
395 drive any I/O needed to perform requested actions. In this mode
396 Messenger will never block.
397 """)
398
400 return pn_messenger_get_incoming_window(self._mng)
401
403 self._check(pn_messenger_set_incoming_window(self._mng, window))
404
405 incoming_window = property(_get_incoming_window, _set_incoming_window,
406 doc="""
407 The incoming tracking window for the messenger. The messenger will
408 track the remote status of this many incoming deliveries after they
409 have been accepted or rejected. Defaults to zero.
410
411 L{Messages<Message>} enter this window only when you take them into your application
412 using L{get}. If your incoming window size is I{n}, and you get I{n}+1 L{messages<Message>}
413 without explicitly accepting or rejecting the oldest message, then the
414 message that passes beyond the edge of the incoming window will be assigned
415 the default disposition of its link.
416 """)
417
419 return pn_messenger_get_outgoing_window(self._mng)
420
422 self._check(pn_messenger_set_outgoing_window(self._mng, window))
423
424 outgoing_window = property(_get_outgoing_window, _set_outgoing_window,
425 doc="""
426 The outgoing tracking window for the messenger. The messenger will
427 track the remote status of this many outgoing deliveries after calling
428 send. Defaults to zero.
429
430 A L{Message} enters this window when you call the put() method with the
431 message. If your outgoing window size is I{n}, and you call L{put} I{n}+1
432 times, status information will no longer be available for the
433 first message.
434 """)
435
437 """
438 Currently a no-op placeholder.
439 For future compatibility, do not L{send} or L{recv} messages
440 before starting the L{Messenger}.
441 """
442 self._check(pn_messenger_start(self._mng))
443
445 """
446 Transitions the L{Messenger} to an inactive state. An inactive
447 L{Messenger} will not send or receive messages from its internal
448 queues. A L{Messenger} should be stopped before being discarded to
449 ensure a clean shutdown handshake occurs on any internally managed
450 connections.
451 """
452 self._check(pn_messenger_stop(self._mng))
453
454 @property
456 """
457 Returns true iff a L{Messenger} is in the stopped state.
458 This function does not block.
459 """
460 return pn_messenger_stopped(self._mng)
461
463 """
464 Subscribes the L{Messenger} to messages originating from the
465 specified source. The source is an address as specified in the
466 L{Messenger} introduction with the following addition. If the
467 domain portion of the address begins with the '~' character, the
468 L{Messenger} will interpret the domain as host/port, bind to it,
469 and listen for incoming messages. For example "~0.0.0.0",
470 "amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any
471 local interface and listen for incoming messages with the last
472 variant only permitting incoming SSL connections.
473
474 @type source: string
475 @param source: the source of messages to subscribe to
476 """
477 sub_impl = pn_messenger_subscribe(self._mng, source)
478 if not sub_impl:
479 self._check(pn_error_code(pn_messenger_error(self._mng)))
480 raise MessengerException("Cannot subscribe to %s"%source)
481 return Subscription(sub_impl)
482
483 - def put(self, message):
484 """
485 Places the content contained in the message onto the outgoing
486 queue of the L{Messenger}. This method will never block, however
487 it will send any unblocked L{Messages<Message>} in the outgoing
488 queue immediately and leave any blocked L{Messages<Message>}
489 remaining in the outgoing queue. The L{send} call may be used to
490 block until the outgoing queue is empty. The L{outgoing} property
491 may be used to check the depth of the outgoing queue.
492
493 When the content in a given L{Message} object is copied to the outgoing
494 message queue, you may then modify or discard the L{Message} object
495 without having any impact on the content in the outgoing queue.
496
497 This method returns an outgoing tracker for the L{Message}. The tracker
498 can be used to determine the delivery status of the L{Message}.
499
500 @type message: Message
501 @param message: the message to place in the outgoing queue
502 @return: a tracker
503 """
504 message._pre_encode()
505 self._check(pn_messenger_put(self._mng, message._msg))
506 return pn_messenger_outgoing_tracker(self._mng)
507
509 """
510 Gets the last known remote state of the delivery associated with
511 the given tracker.
512
513 @type tracker: tracker
514 @param tracker: the tracker whose status is to be retrieved
515
516 @return: one of None, PENDING, REJECTED, MODIFIED, or ACCEPTED
517 """
518 disp = pn_messenger_status(self._mng, tracker);
519 return STATUSES.get(disp, disp)
520
522 """
523 Checks if the delivery associated with the given tracker is still
524 waiting to be sent.
525
526 @type tracker: tracker
527 @param tracker: the tracker whose status is to be retrieved
528
529 @return: true if delivery is still buffered
530 """
531 return pn_messenger_buffered(self._mng, tracker);
532
533 - def settle(self, tracker=None):
534 """
535 Frees a L{Messenger} from tracking the status associated with a given
536 tracker. If you don't supply a tracker, all outgoing L{messages<Message>} up
537 to the most recent will be settled.
538 """
539 if tracker is None:
540 tracker = pn_messenger_outgoing_tracker(self._mng)
541 flags = PN_CUMULATIVE
542 else:
543 flags = 0
544 self._check(pn_messenger_settle(self._mng, tracker, flags))
545
546 - def send(self, n=-1):
547 """
548 This call will block until the indicated number of L{messages<Message>}
549 have been sent, or until the operation times out. If n is -1 this call will
550 block until all outgoing L{messages<Message>} have been sent. If n is 0 then
551 this call will send whatever it can without blocking.
552 """
553 self._check(pn_messenger_send(self._mng, n))
554
555 - def recv(self, n=None):
556 """
557 Receives up to I{n} L{messages<Message>} into the incoming queue. If no value
558 for I{n} is supplied, this call will receive as many L{messages<Message>} as it
559 can buffer internally. If the L{Messenger} is in blocking mode, this
560 call will block until at least one L{Message} is available in the
561 incoming queue.
562 """
563 if n is None:
564 n = -1
565 self._check(pn_messenger_recv(self._mng, n))
566
567 - def work(self, timeout=None):
568 """
569 Sends or receives any outstanding L{messages<Message>} queued for a L{Messenger}.
570 This will block for the indicated timeout.
571 This method may also do I/O work other than sending and receiving
572 L{messages<Message>}. For example, closing connections after messenger.L{stop}()
573 has been called.
574 """
575 if timeout is None:
576 t = -1
577 else:
578 t = secs2millis(timeout)
579 err = pn_messenger_work(self._mng, t)
580 if (err == PN_TIMEOUT):
581 return False
582 else:
583 self._check(err)
584 return True
585
586 @property
588 return pn_messenger_receiving(self._mng)
589
591 """
592 The L{Messenger} interface is single-threaded.
593 This is the only L{Messenger} function intended to be called
594 from outside of the L{Messenger} thread.
595 Call this from a non-messenger thread to interrupt
596 a L{Messenger} that is blocking.
597 This will cause any in-progress blocking call to throw
598 the L{Interrupt} exception. If there is no currently blocking
599 call, then the next blocking call will be affected, even if it
600 is within the same thread that interrupt was called from.
601 """
602 self._check(pn_messenger_interrupt(self._mng))
603
604 - def get(self, message=None):
605 """
606 Moves the message from the head of the incoming message queue into
607 the supplied message object. Any content in the message will be
608 overwritten.
609
610 A tracker for the incoming L{Message} is returned. The tracker can
611 later be used to communicate your acceptance or rejection of the
612 L{Message}.
613
614 If None is passed in for the L{Message} object, the L{Message}
615 popped from the head of the queue is discarded.
616
617 @type message: Message
618 @param message: the destination message object
619 @return: a tracker
620 """
621 if message is None:
622 impl = None
623 else:
624 impl = message._msg
625 self._check(pn_messenger_get(self._mng, impl))
626 if message is not None:
627 message._post_decode()
628 return pn_messenger_incoming_tracker(self._mng)
629
630 - def accept(self, tracker=None):
631 """
632 Signal the sender that you have acted on the L{Message}
633 pointed to by the tracker. If no tracker is supplied,
634 then all messages that have been returned by the L{get}
635 method are accepted, except those that have already been
636 auto-settled by passing beyond your incoming window size.
637
638 @type tracker: tracker
639 @param tracker: a tracker as returned by get
640 """
641 if tracker is None:
642 tracker = pn_messenger_incoming_tracker(self._mng)
643 flags = PN_CUMULATIVE
644 else:
645 flags = 0
646 self._check(pn_messenger_accept(self._mng, tracker, flags))
647
648 - def reject(self, tracker=None):
649 """
650 Rejects the L{Message} indicated by the tracker. If no tracker
651 is supplied, all messages that have been returned by the L{get}
652 method are rejected, except those that have already been auto-settled
653 by passing beyond your outgoing window size.
654
655 @type tracker: tracker
656 @param tracker: a tracker as returned by get
657 """
658 if tracker is None:
659 tracker = pn_messenger_incoming_tracker(self._mng)
660 flags = PN_CUMULATIVE
661 else:
662 flags = 0
663 self._check(pn_messenger_reject(self._mng, tracker, flags))
664
665 @property
667 """
668 The outgoing queue depth.
669 """
670 return pn_messenger_outgoing(self._mng)
671
672 @property
674 """
675 The incoming queue depth.
676 """
677 return pn_messenger_incoming(self._mng)
678
679 - def route(self, pattern, address):
680 """
681 Adds a routing rule to a L{Messenger's<Messenger>} internal routing table.
682
683 The route procedure may be used to influence how a L{Messenger} will
684 internally treat a given address or class of addresses. Every call
685 to the route procedure will result in L{Messenger} appending a routing
686 rule to its internal routing table.
687
688 Whenever a L{Message} is presented to a L{Messenger} for delivery, it
689 will match the address of this message against the set of routing
690 rules in order. The first rule to match will be triggered, and
691 instead of routing based on the address presented in the message,
692 the L{Messenger} will route based on the address supplied in the rule.
693
694 The pattern matching syntax supports two types of matches, a '%'
695 will match any character except a '/', and a '*' will match any
696 character including a '/'.
697
698 A routing address is specified as a normal AMQP address, however it
699 may additionally use substitution variables from the pattern match
700 that triggered the rule.
701
702 Any message sent to "foo" will be routed to "amqp://foo.com":
703
704 >>> messenger.route("foo", "amqp://foo.com");
705
706 Any message sent to "foobar" will be routed to
707 "amqp://foo.com/bar":
708
709 >>> messenger.route("foobar", "amqp://foo.com/bar");
710
711 Any message sent to bar/<path> will be routed to the corresponding
712 path within the amqp://bar.com domain:
713
714 >>> messenger.route("bar/*", "amqp://bar.com/$1");
715
716 Route all L{messages<Message>} over TLS:
717
718 >>> messenger.route("amqp:*", "amqps:$1")
719
720 Supply credentials for foo.com:
721
722 >>> messenger.route("amqp://foo.com/*", "amqp://user:password@foo.com/$1");
723
724 Supply credentials for all domains:
725
726 >>> messenger.route("amqp://*", "amqp://user:password@$1");
727
728 Route all addresses through a single proxy while preserving the
729 original destination:
730
731 >>> messenger.route("amqp://%/*", "amqp://user:password@proxy/$1/$2");
732
733 Route any address through a single broker:
734
735 >>> messenger.route("*", "amqp://user:password@broker/$1");
736 """
737 self._check(pn_messenger_route(self._mng, unicode2utf8(pattern), unicode2utf8(address)))
738
739 - def rewrite(self, pattern, address):
740 """
741 Similar to route(), except that the destination of
742 the L{Message} is determined before the message address is rewritten.
743
744 The outgoing address is only rewritten after routing has been
745 finalized. If a message has an outgoing address of
746 "amqp://0.0.0.0:5678", and a rewriting rule that changes its
747 outgoing address to "foo", it will still arrive at the peer that
748 is listening on "amqp://0.0.0.0:5678", but when it arrives there,
749 the receiver will see its outgoing address as "foo".
750
751 The default rewrite rule removes username and password from addresses
752 before they are transmitted.
753 """
754 self._check(pn_messenger_rewrite(self._mng, unicode2utf8(pattern), unicode2utf8(address)))
755
757 return Selectable.wrap(pn_messenger_selectable(self._mng))
758
759 @property
761 tstamp = pn_messenger_deadline(self._mng)
762 if tstamp:
763 return millis2secs(tstamp)
764 else:
765 return None
766
768 """The L{Message} class is a mutable holder of message content.
769
770 @ivar instructions: delivery instructions for the message
771 @type instructions: dict
772 @ivar annotations: infrastructure defined message annotations
773 @type annotations: dict
774 @ivar properties: application defined message properties
775 @type properties: dict
776 @ivar body: message body
777 @type body: bytes | unicode | dict | list | int | long | float | UUID
778 """
779
780 DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY
781
782 - def __init__(self, body=None, **kwargs):
783 """
784 @param kwargs: Message property name/value pairs to initialise the Message
785 """
786 self._msg = pn_message()
787 self._id = Data(pn_message_id(self._msg))
788 self._correlation_id = Data(pn_message_correlation_id(self._msg))
789 self.instructions = None
790 self.annotations = None
791 self.properties = None
792 self.body = body
793 for k,v in kwargs.iteritems():
794 getattr(self, k)
795 setattr(self, k, v)
796
798 if hasattr(self, "_msg"):
799 pn_message_free(self._msg)
800 del self._msg
801
803 if err < 0:
804 exc = EXCEPTIONS.get(err, MessageException)
805 raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg))))
806 else:
807 return err
808
827
828 - def _post_decode(self):
829 inst = Data(pn_message_instructions(self._msg))
830 ann = Data(pn_message_annotations(self._msg))
831 props = Data(pn_message_properties(self._msg))
832 body = Data(pn_message_body(self._msg))
833
834 if inst.next():
835 self.instructions = inst.get_object()
836 else:
837 self.instructions = None
838 if ann.next():
839 self.annotations = ann.get_object()
840 else:
841 self.annotations = None
842 if props.next():
843 self.properties = props.get_object()
844 else:
845 self.properties = None
846 if body.next():
847 self.body = body.get_object()
848 else:
849 self.body = None
850
852 """
853 Clears the contents of the L{Message}. All fields will be reset to
854 their default values.
855 """
856 pn_message_clear(self._msg)
857 self.instructions = None
858 self.annotations = None
859 self.properties = None
860 self.body = None
861
863 return pn_message_is_inferred(self._msg)
864
866 self._check(pn_message_set_inferred(self._msg, bool(value)))
867
868 inferred = property(_is_inferred, _set_inferred, doc="""
869 The inferred flag for a message indicates how the message content
870 is encoded into AMQP sections. If inferred is true then binary and
871 list values in the body of the message will be encoded as AMQP DATA
872 and AMQP SEQUENCE sections, respectively. If inferred is false,
873 then all values in the body of the message will be encoded as AMQP
874 VALUE sections regardless of their type.
875 """)
876
878 return pn_message_is_durable(self._msg)
879
881 self._check(pn_message_set_durable(self._msg, bool(value)))
882
883 durable = property(_is_durable, _set_durable,
884 doc="""
885 The durable property indicates that the message should be held durably
886 by any intermediaries taking responsibility for the message.
887 """)
888
890 return pn_message_get_priority(self._msg)
891
893 self._check(pn_message_set_priority(self._msg, value))
894
895 priority = property(_get_priority, _set_priority,
896 doc="""
897 The priority of the message.
898 """)
899
901 return millis2secs(pn_message_get_ttl(self._msg))
902
904 self._check(pn_message_set_ttl(self._msg, secs2millis(value)))
905
906 ttl = property(_get_ttl, _set_ttl,
907 doc="""
908 The time to live of the message measured in seconds. Expired messages
909 may be dropped.
910 """)
911
913 return pn_message_is_first_acquirer(self._msg)
914
916 self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
917
918 first_acquirer = property(_is_first_acquirer, _set_first_acquirer,
919 doc="""
920 True iff the recipient is the first to acquire the message.
921 """)
922
924 return pn_message_get_delivery_count(self._msg)
925
927 self._check(pn_message_set_delivery_count(self._msg, value))
928
929 delivery_count = property(_get_delivery_count, _set_delivery_count,
930 doc="""
931 The number of delivery attempts made for this message.
932 """)
933
934
942 id = property(_get_id, _set_id,
943 doc="""
944 The id of the message.
945 """)
946
948 return pn_message_get_user_id(self._msg)
949
951 self._check(pn_message_set_user_id(self._msg, value))
952
953 user_id = property(_get_user_id, _set_user_id,
954 doc="""
955 The user id of the message creator.
956 """)
957
959 return utf82unicode(pn_message_get_address(self._msg))
960
962 self._check(pn_message_set_address(self._msg, unicode2utf8(value)))
963
964 address = property(_get_address, _set_address,
965 doc="""
966 The address of the message.
967 """)
968
970 return pn_message_get_subject(self._msg)
971
973 self._check(pn_message_set_subject(self._msg, value))
974
975 subject = property(_get_subject, _set_subject,
976 doc="""
977 The subject of the message.
978 """)
979
981 return utf82unicode(pn_message_get_reply_to(self._msg))
982
984 self._check(pn_message_set_reply_to(self._msg, unicode2utf8(value)))
985
986 reply_to = property(_get_reply_to, _set_reply_to,
987 doc="""
988 The reply-to address for the message.
989 """)
990
994 if type(value) in (int, long):
995 value = ulong(value)
996 self._correlation_id.rewind()
997 self._correlation_id.put_object(value)
998
999 correlation_id = property(_get_correlation_id, _set_correlation_id,
1000 doc="""
1001 The correlation-id for the message.
1002 """)
1003
1005 return pn_message_get_content_type(self._msg)
1006
1007 - def _set_content_type(self, value):
1008 self._check(pn_message_set_content_type(self._msg, value))
1009
1010 content_type = property(_get_content_type, _set_content_type,
1011 doc="""
1012 The content-type of the message.
1013 """)
1014
1016 return pn_message_get_content_encoding(self._msg)
1017
1018 - def _set_content_encoding(self, value):
1019 self._check(pn_message_set_content_encoding(self._msg, value))
1020
1021 content_encoding = property(_get_content_encoding, _set_content_encoding,
1022 doc="""
1023 The content-encoding of the message.
1024 """)
1025
1027 return millis2secs(pn_message_get_expiry_time(self._msg))
1028
1030 self._check(pn_message_set_expiry_time(self._msg, secs2millis(value)))
1031
1032 expiry_time = property(_get_expiry_time, _set_expiry_time,
1033 doc="""
1034 The expiry time of the message.
1035 """)
1036
1038 return millis2secs(pn_message_get_creation_time(self._msg))
1039
1041 self._check(pn_message_set_creation_time(self._msg, secs2millis(value)))
1042
1043 creation_time = property(_get_creation_time, _set_creation_time,
1044 doc="""
1045 The creation time of the message.
1046 """)
1047
1049 return pn_message_get_group_id(self._msg)
1050
1052 self._check(pn_message_set_group_id(self._msg, value))
1053
1054 group_id = property(_get_group_id, _set_group_id,
1055 doc="""
1056 The group id of the message.
1057 """)
1058
1060 return pn_message_get_group_sequence(self._msg)
1061
1063 self._check(pn_message_set_group_sequence(self._msg, value))
1064
1065 group_sequence = property(_get_group_sequence, _set_group_sequence,
1066 doc="""
1067 The sequence of the message within its group.
1068 """)
1069
1071 return pn_message_get_reply_to_group_id(self._msg)
1072
1074 self._check(pn_message_set_reply_to_group_id(self._msg, value))
1075
1076 reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id,
1077 doc="""
1078 The group-id for any replies.
1079 """)
1080
1082 self._pre_encode()
1083 sz = 16
1084 while True:
1085 err, data = pn_message_encode(self._msg, sz)
1086 if err == PN_OVERFLOW:
1087 sz *= 2
1088 continue
1089 else:
1090 self._check(err)
1091 return data
1092
1094 self._check(pn_message_decode(self._msg, data, len(data)))
1095 self._post_decode()
1096
1097 - def send(self, sender, tag=None):
1105
1106 - def recv(self, link):
1107 """
1108 Receives and decodes the message content for the current delivery
1109 from the link. Upon success it will return the current delivery
1110 for the link. If there is no current delivery, or if the current
1111 delivery is incomplete, or if the link is not a receiver, it will
1112 return None.
1113
1114 @type link: Link
1115 @param link: the link to receive a message from
1116 @return the delivery associated with the decoded message (or None)
1117
1118 """
1119 if link.is_sender: return None
1120 dlv = link.current
1121 if not dlv or dlv.partial: return None
1122 encoded = link.recv(dlv.pending)
1123 link.advance()
1124
1125
1126 if link.remote_snd_settle_mode == Link.SND_SETTLED:
1127 dlv.settle()
1128 self.decode(encoded)
1129 return dlv
1130
1132 props = []
1133 for attr in ("inferred", "address", "reply_to", "durable", "ttl",
1134 "priority", "first_acquirer", "delivery_count", "id",
1135 "correlation_id", "user_id", "group_id", "group_sequence",
1136 "reply_to_group_id", "instructions", "annotations",
1137 "properties", "body"):
1138 value = getattr(self, attr)
1139 if value: props.append("%s=%r" % (attr, value))
1140 return "Message(%s)" % ", ".join(props)
1141
1143 tmp = pn_string(None)
1144 err = pn_inspect(self._msg, tmp)
1145 result = pn_string_get(tmp)
1146 pn_free(tmp)
1147 self._check(err)
1148 return result
1149
1151
1154
1155 @property
1157 return pn_subscription_address(self._impl)
1158
1159 _DEFAULT = object()
1162
1163 @staticmethod
1165 if impl is None:
1166 return None
1167 else:
1168 return Selectable(impl)
1169
1172
1175
1177 if fd is _DEFAULT:
1178 return pn_selectable_get_fd(self._impl)
1179 elif fd is None:
1180 pn_selectable_set_fd(self._impl, PN_INVALID_SOCKET)
1181 else:
1182 pn_selectable_set_fd(self._impl, fd)
1183
1185 return pn_selectable_is_reading(self._impl)
1186
1188 pn_selectable_set_reading(self._impl, bool(val))
1189
1190 reading = property(_is_reading, _set_reading)
1191
1193 return pn_selectable_is_writing(self._impl)
1194
1196 pn_selectable_set_writing(self._impl, bool(val))
1197
1198 writing = property(_is_writing, _set_writing)
1199
1201 tstamp = pn_selectable_get_deadline(self._impl)
1202 if tstamp:
1203 return millis2secs(tstamp)
1204 else:
1205 return None
1206
1208 pn_selectable_set_deadline(self._impl, secs2millis(deadline))
1209
1210 deadline = property(_get_deadline, _set_deadline)
1211
1213 pn_selectable_readable(self._impl)
1214
1216 pn_selectable_writable(self._impl)
1217
1219 pn_selectable_expired(self._impl)
1220
1222 return pn_selectable_is_registered(self._impl)
1223
1225 pn_selectable_set_registered(self._impl, registered)
1226
1227 registered = property(_is_registered, _set_registered,
1228 doc="""
1229 The registered property may be get/set by an I/O polling system to
1230 indicate whether the fd has been registered or not.
1231 """)
1232
1233 @property
1235 return pn_selectable_is_terminal(self._impl)
1236
1238 pn_selectable_terminate(self._impl)
1239
1241 pn_selectable_release(self._impl)
1242
1244 """
1245 The DataException class is the root of the Data exception hierarchy.
1246 All exceptions raised by the Data class extend this exception.
1247 """
1248 pass
1249
1251
1254
1256 return "UnmappedType(%s)" % self.msg
1257
1259
1261 return "ulong(%s)" % long.__repr__(self)
1262
1264
1266 return "timestamp(%s)" % long.__repr__(self)
1267
1269
1271 return "symbol(%s)" % unicode.__repr__(self)
1272
1273 -class char(unicode):
1274
1276 return "char(%s)" % unicode.__repr__(self)
1277
1279
1280 - def __init__(self, descriptor, value):
1281 self.descriptor = descriptor
1282 self.value = value
1283
1285 return "Described(%r, %r)" % (self.descriptor, self.value)
1286
1288 if isinstance(o, Described):
1289 return self.descriptor == o.descriptor and self.value == o.value
1290 else:
1291 return False
1292
1293 UNDESCRIBED = Constant("UNDESCRIBED")
1294
1295 -class Array(object):
1296
1297 - def __init__(self, descriptor, type, *elements):
1298 self.descriptor = descriptor
1299 self.type = type
1300 self.elements = elements
1301
1303 if self.elements:
1304 els = ", %s" % (", ".join(map(repr, self.elements)))
1305 else:
1306 els = ""
1307 return "Array(%r, %r%s)" % (self.descriptor, self.type, els)
1308
1310 if isinstance(o, Array):
1311 return self.descriptor == o.descriptor and \
1312 self.type == o.type and self.elements == o.elements
1313 else:
1314 return False
1315
1317 """
1318 The L{Data} class provides an interface for decoding, extracting,
1319 creating, and encoding arbitrary AMQP data. A L{Data} object
1320 contains a tree of AMQP values. Leaf nodes in this tree correspond
1321 to scalars in the AMQP type system such as L{ints<INT>} or
1322 L{strings<STRING>}. Non-leaf nodes in this tree correspond to
1323 compound values in the AMQP type system such as L{lists<LIST>},
1324 L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}.
1325 The root node of the tree is the L{Data} object itself and can have
1326 an arbitrary number of children.
1327
1328 A L{Data} object maintains the notion of the current sibling node
1329 and a current parent node. Siblings are ordered within their parent.
1330 Values are accessed and/or added by using the L{next}, L{prev},
1331 L{enter}, and L{exit} methods to navigate to the desired location in
1332 the tree and using the supplied variety of put_*/get_* methods to
1333 access or add a value of the desired type.
1334
1335 The put_* methods will always add a value I{after} the current node
1336 in the tree. If the current node has a next sibling the put_* method
1337 will overwrite the value on this node. If there is no current node
1338 or the current node has no next sibling then one will be added. The
1339 put_* methods always set the added/modified node to the current
1340 node. The get_* methods read the value of the current node and do
1341 not change which node is current.
1342
1343 The following types of scalar values are supported:
1344
1345 - L{NULL}
1346 - L{BOOL}
1347 - L{UBYTE}
1348 - L{USHORT}
1349 - L{SHORT}
1350 - L{UINT}
1351 - L{INT}
1352 - L{ULONG}
1353 - L{LONG}
1354 - L{FLOAT}
1355 - L{DOUBLE}
1356 - L{BINARY}
1357 - L{STRING}
1358 - L{SYMBOL}
1359
1360 The following types of compound values are supported:
1361
1362 - L{DESCRIBED}
1363 - L{ARRAY}
1364 - L{LIST}
1365 - L{MAP}
1366 """
1367
1368 NULL = PN_NULL; "A null value."
1369 BOOL = PN_BOOL; "A boolean value."
1370 UBYTE = PN_UBYTE; "An unsigned byte value."
1371 BYTE = PN_BYTE; "A signed byte value."
1372 USHORT = PN_USHORT; "An unsigned short value."
1373 SHORT = PN_SHORT; "A short value."
1374 UINT = PN_UINT; "An unsigned int value."
1375 INT = PN_INT; "A signed int value."
1376 CHAR = PN_CHAR; "A character value."
1377 ULONG = PN_ULONG; "An unsigned long value."
1378 LONG = PN_LONG; "A signed long value."
1379 TIMESTAMP = PN_TIMESTAMP; "A timestamp value."
1380 FLOAT = PN_FLOAT; "A float value."
1381 DOUBLE = PN_DOUBLE; "A double value."
1382 DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value."
1383 DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value."
1384 DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value."
1385 UUID = PN_UUID; "A UUID value."
1386 BINARY = PN_BINARY; "A binary string."
1387 STRING = PN_STRING; "A unicode string."
1388 SYMBOL = PN_SYMBOL; "A symbolic string."
1389 DESCRIBED = PN_DESCRIBED; "A described value."
1390 ARRAY = PN_ARRAY; "An array value."
1391 LIST = PN_LIST; "A list value."
1392 MAP = PN_MAP; "A map value."
1393
1394 type_names = {
1395 NULL: "null",
1396 BOOL: "bool",
1397 BYTE: "byte",
1398 UBYTE: "ubyte",
1399 SHORT: "short",
1400 USHORT: "ushort",
1401 INT: "int",
1402 UINT: "uint",
1403 CHAR: "char",
1404 LONG: "long",
1405 ULONG: "ulong",
1406 TIMESTAMP: "timestamp",
1407 FLOAT: "float",
1408 DOUBLE: "double",
1409 DECIMAL32: "decimal32",
1410 DECIMAL64: "decimal64",
1411 DECIMAL128: "decimal128",
1412 UUID: "uuid",
1413 BINARY: "binary",
1414 STRING: "string",
1415 SYMBOL: "symbol",
1416 DESCRIBED: "described",
1417 ARRAY: "array",
1418 LIST: "list",
1419 MAP: "map"
1420 }
1421
1422 @classmethod
1424
1426 if type(capacity) in (int, long):
1427 self._data = pn_data(capacity)
1428 self._free = True
1429 else:
1430 self._data = capacity
1431 self._free = False
1432
1434 if self._free and hasattr(self, "_data"):
1435 pn_data_free(self._data)
1436 del self._data
1437
1439 if err < 0:
1440 exc = EXCEPTIONS.get(err, DataException)
1441 raise exc("[%s]: %s" % (err, pn_error_text(pn_data_error(self._data))))
1442 else:
1443 return err
1444
1446 """
1447 Clears the data object.
1448 """
1449 pn_data_clear(self._data)
1450
1452 """
1453 Clears current node and sets the parent to the root node. Clearing the
1454 current node sets it _before_ the first node, calling next() will advance to
1455 the first node.
1456 """
1457 assert self._data is not None
1458 pn_data_rewind(self._data)
1459
1461 """
1462 Advances the current node to its next sibling and returns its
1463 type. If there is no next sibling the current node remains
1464 unchanged and None is returned.
1465 """
1466 found = pn_data_next(self._data)
1467 if found:
1468 return self.type()
1469 else:
1470 return None
1471
1473 """
1474 Advances the current node to its previous sibling and returns its
1475 type. If there is no previous sibling the current node remains
1476 unchanged and None is returned.
1477 """
1478 found = pn_data_prev(self._data)
1479 if found:
1480 return self.type()
1481 else:
1482 return None
1483
1485 """
1486 Sets the parent node to the current node and clears the current node.
1487 Clearing the current node sets it _before_ the first child,
1488 call next() advances to the first child.
1489 """
1490 return pn_data_enter(self._data)
1491
1493 """
1494 Sets the current node to the parent node and the parent node to
1495 its own parent.
1496 """
1497 return pn_data_exit(self._data)
1498
1500 return pn_data_lookup(self._data, name)
1501
1503 pn_data_narrow(self._data)
1504
1506 pn_data_widen(self._data)
1507
1509 """
1510 Returns the type of the current node.
1511 """
1512 dtype = pn_data_type(self._data)
1513 if dtype == -1:
1514 return None
1515 else:
1516 return dtype
1517
1519 """
1520 Returns a representation of the data encoded in AMQP format.
1521 """
1522 size = 1024
1523 while True:
1524 cd, enc = pn_data_encode(self._data, size)
1525 if cd == PN_OVERFLOW:
1526 size *= 2
1527 elif cd >= 0:
1528 return enc
1529 else:
1530 self._check(cd)
1531
1533 """
1534 Decodes the first value from supplied AMQP data and returns the
1535 number of bytes consumed.
1536
1537 @type encoded: binary
1538 @param encoded: AMQP encoded binary data
1539 """
1540 return self._check(pn_data_decode(self._data, encoded))
1541
1543 """
1544 Puts a list value. Elements may be filled by entering the list
1545 node and putting element values.
1546
1547 >>> data = Data()
1548 >>> data.put_list()
1549 >>> data.enter()
1550 >>> data.put_int(1)
1551 >>> data.put_int(2)
1552 >>> data.put_int(3)
1553 >>> data.exit()
1554 """
1555 self._check(pn_data_put_list(self._data))
1556
1558 """
1559 Puts a map value. Elements may be filled by entering the map node
1560 and putting alternating key value pairs.
1561
1562 >>> data = Data()
1563 >>> data.put_map()
1564 >>> data.enter()
1565 >>> data.put_string("key")
1566 >>> data.put_string("value")
1567 >>> data.exit()
1568 """
1569 self._check(pn_data_put_map(self._data))
1570
1571 - def put_array(self, described, element_type):
1572 """
1573 Puts an array value. Elements may be filled by entering the array
1574 node and putting the element values. The values must all be of the
1575 specified array element type. If an array is described then the
1576 first child value of the array is the descriptor and may be of any
1577 type.
1578
1579 >>> data = Data()
1580 >>>
1581 >>> data.put_array(False, Data.INT)
1582 >>> data.enter()
1583 >>> data.put_int(1)
1584 >>> data.put_int(2)
1585 >>> data.put_int(3)
1586 >>> data.exit()
1587 >>>
1588 >>> data.put_array(True, Data.DOUBLE)
1589 >>> data.enter()
1590 >>> data.put_symbol("array-descriptor")
1591 >>> data.put_double(1.1)
1592 >>> data.put_double(1.2)
1593 >>> data.put_double(1.3)
1594 >>> data.exit()
1595
1596 @type described: bool
1597 @param described: specifies whether the array is described
1598 @type element_type: int
1599 @param element_type: the type of the array elements
1600 """
1601 self._check(pn_data_put_array(self._data, described, element_type))
1602
1604 """
1605 Puts a described value. A described node has two children, the
1606 descriptor and the value. These are specified by entering the node
1607 and putting the desired values.
1608
1609 >>> data = Data()
1610 >>> data.put_described()
1611 >>> data.enter()
1612 >>> data.put_symbol("value-descriptor")
1613 >>> data.put_string("the value")
1614 >>> data.exit()
1615 """
1616 self._check(pn_data_put_described(self._data))
1617
1619 """
1620 Puts a null value.
1621 """
1622 self._check(pn_data_put_null(self._data))
1623
1625 """
1626 Puts a boolean value.
1627
1628 @param b: a boolean value
1629 """
1630 self._check(pn_data_put_bool(self._data, b))
1631
1633 """
1634 Puts an unsigned byte value.
1635
1636 @param ub: an integral value
1637 """
1638 self._check(pn_data_put_ubyte(self._data, ub))
1639
1641 """
1642 Puts a signed byte value.
1643
1644 @param b: an integral value
1645 """
1646 self._check(pn_data_put_byte(self._data, b))
1647
1649 """
1650 Puts an unsigned short value.
1651
1652 @param us: an integral value.
1653 """
1654 self._check(pn_data_put_ushort(self._data, us))
1655
1657 """
1658 Puts a signed short value.
1659
1660 @param s: an integral value
1661 """
1662 self._check(pn_data_put_short(self._data, s))
1663
1665 """
1666 Puts an unsigned int value.
1667
1668 @param ui: an integral value
1669 """
1670 self._check(pn_data_put_uint(self._data, ui))
1671
1673 """
1674 Puts a signed int value.
1675
1676 @param i: an integral value
1677 """
1678 self._check(pn_data_put_int(self._data, i))
1679
1681 """
1682 Puts a char value.
1683
1684 @param c: a single character
1685 """
1686 self._check(pn_data_put_char(self._data, ord(c)))
1687
1689 """
1690 Puts an unsigned long value.
1691
1692 @param ul: an integral value
1693 """
1694 self._check(pn_data_put_ulong(self._data, ul))
1695
1697 """
1698 Puts a signed long value.
1699
1700 @param l: an integral value
1701 """
1702 self._check(pn_data_put_long(self._data, l))
1703
1705 """
1706 Puts a timestamp value.
1707
1708 @param t: an integral value
1709 """
1710 self._check(pn_data_put_timestamp(self._data, t))
1711
1713 """
1714 Puts a float value.
1715
1716 @param f: a floating point value
1717 """
1718 self._check(pn_data_put_float(self._data, f))
1719
1721 """
1722 Puts a double value.
1723
1724 @param d: a floating point value.
1725 """
1726 self._check(pn_data_put_double(self._data, d))
1727
1729 """
1730 Puts a decimal32 value.
1731
1732 @param d: a decimal32 value
1733 """
1734 self._check(pn_data_put_decimal32(self._data, d))
1735
1737 """
1738 Puts a decimal64 value.
1739
1740 @param d: a decimal64 value
1741 """
1742 self._check(pn_data_put_decimal64(self._data, d))
1743
1745 """
1746 Puts a decimal128 value.
1747
1748 @param d: a decimal128 value
1749 """
1750 self._check(pn_data_put_decimal128(self._data, d))
1751
1753 """
1754 Puts a UUID value.
1755
1756 @param u: a uuid value
1757 """
1758 self._check(pn_data_put_uuid(self._data, u.bytes))
1759
1761 """
1762 Puts a binary value.
1763
1764 @type b: binary
1765 @param b: a binary value
1766 """
1767 self._check(pn_data_put_binary(self._data, b))
1768
1770 """
1771 Puts a unicode value.
1772
1773 @type s: unicode
1774 @param s: a unicode value
1775 """
1776 self._check(pn_data_put_string(self._data, s.encode("utf8")))
1777
1779 """
1780 Puts a symbolic value.
1781
1782 @type s: string
1783 @param s: the symbol name
1784 """
1785 self._check(pn_data_put_symbol(self._data, s))
1786
1788 """
1789 If the current node is a list, return the number of elements,
1790 otherwise return zero. List elements can be accessed by entering
1791 the list.
1792
1793 >>> count = data.get_list()
1794 >>> data.enter()
1795 >>> for i in range(count):
1796 ... type = data.next()
1797 ... if type == Data.STRING:
1798 ... print data.get_string()
1799 ... elif type == ...:
1800 ... ...
1801 >>> data.exit()
1802 """
1803 return pn_data_get_list(self._data)
1804
1806 """
1807 If the current node is a map, return the number of child elements,
1808 otherwise return zero. Key value pairs can be accessed by entering
1809 the map.
1810
1811 >>> count = data.get_map()
1812 >>> data.enter()
1813 >>> for i in range(count/2):
1814 ... type = data.next()
1815 ... if type == Data.STRING:
1816 ... print data.get_string()
1817 ... elif type == ...:
1818 ... ...
1819 >>> data.exit()
1820 """
1821 return pn_data_get_map(self._data)
1822
1824 """
1825 If the current node is an array, return a tuple of the element
1826 count, a boolean indicating whether the array is described, and
1827 the type of each element, otherwise return (0, False, None). Array
1828 data can be accessed by entering the array.
1829
1830 >>> # read an array of strings with a symbolic descriptor
1831 >>> count, described, type = data.get_array()
1832 >>> data.enter()
1833 >>> data.next()
1834 >>> print "Descriptor:", data.get_symbol()
1835 >>> for i in range(count):
1836 ... data.next()
1837 ... print "Element:", data.get_string()
1838 >>> data.exit()
1839 """
1840 count = pn_data_get_array(self._data)
1841 described = pn_data_is_array_described(self._data)
1842 type = pn_data_get_array_type(self._data)
1843 if type == -1:
1844 type = None
1845 return count, described, type
1846
1848 """
1849 Checks if the current node is a described value. The descriptor
1850 and value may be accessed by entering the described value.
1851
1852 >>> # read a symbolically described string
1853 >>> assert data.is_described() # will error if the current node is not described
1854 >>> data.enter()
1855 >>> data.next()
1856 >>> print data.get_symbol()
1857 >>> data.next()
1858 >>> print data.get_string()
1859 >>> data.exit()
1860 """
1861 return pn_data_is_described(self._data)
1862
1864 """
1865 Checks if the current node is a null.
1866 """
1867 return pn_data_is_null(self._data)
1868
1870 """
1871 If the current node is a boolean, returns its value, returns False
1872 otherwise.
1873 """
1874 return pn_data_get_bool(self._data)
1875
1877 """
1878 If the current node is an unsigned byte, returns its value,
1879 returns 0 otherwise.
1880 """
1881 return pn_data_get_ubyte(self._data)
1882
1884 """
1885 If the current node is a signed byte, returns its value, returns 0
1886 otherwise.
1887 """
1888 return pn_data_get_byte(self._data)
1889
1891 """
1892 If the current node is an unsigned short, returns its value,
1893 returns 0 otherwise.
1894 """
1895 return pn_data_get_ushort(self._data)
1896
1898 """
1899 If the current node is a signed short, returns its value, returns
1900 0 otherwise.
1901 """
1902 return pn_data_get_short(self._data)
1903
1905 """
1906 If the current node is an unsigned int, returns its value, returns
1907 0 otherwise.
1908 """
1909 return pn_data_get_uint(self._data)
1910
1912 """
1913 If the current node is a signed int, returns its value, returns 0
1914 otherwise.
1915 """
1916 return pn_data_get_int(self._data)
1917
1919 """
1920 If the current node is a char, returns its value, returns 0
1921 otherwise.
1922 """
1923 return char(unichr(pn_data_get_char(self._data)))
1924
1926 """
1927 If the current node is an unsigned long, returns its value,
1928 returns 0 otherwise.
1929 """
1930 return ulong(pn_data_get_ulong(self._data))
1931
1933 """
1934 If the current node is an signed long, returns its value, returns
1935 0 otherwise.
1936 """
1937 return pn_data_get_long(self._data)
1938
1940 """
1941 If the current node is a timestamp, returns its value, returns 0
1942 otherwise.
1943 """
1944 return timestamp(pn_data_get_timestamp(self._data))
1945
1947 """
1948 If the current node is a float, returns its value, raises 0
1949 otherwise.
1950 """
1951 return pn_data_get_float(self._data)
1952
1954 """
1955 If the current node is a double, returns its value, returns 0
1956 otherwise.
1957 """
1958 return pn_data_get_double(self._data)
1959
1960
1962 """
1963 If the current node is a decimal32, returns its value, returns 0
1964 otherwise.
1965 """
1966 return pn_data_get_decimal32(self._data)
1967
1968
1970 """
1971 If the current node is a decimal64, returns its value, returns 0
1972 otherwise.
1973 """
1974 return pn_data_get_decimal64(self._data)
1975
1976
1978 """
1979 If the current node is a decimal128, returns its value, returns 0
1980 otherwise.
1981 """
1982 return pn_data_get_decimal128(self._data)
1983
1985 """
1986 If the current node is a UUID, returns its value, returns None
1987 otherwise.
1988 """
1989 if pn_data_type(self._data) == Data.UUID:
1990 return uuid.UUID(bytes=pn_data_get_uuid(self._data))
1991 else:
1992 return None
1993
1995 """
1996 If the current node is binary, returns its value, returns ""
1997 otherwise.
1998 """
1999 return pn_data_get_binary(self._data)
2000
2002 """
2003 If the current node is a string, returns its value, returns ""
2004 otherwise.
2005 """
2006 return pn_data_get_string(self._data).decode("utf8")
2007
2009 """
2010 If the current node is a symbol, returns its value, returns ""
2011 otherwise.
2012 """
2013 return symbol(pn_data_get_symbol(self._data))
2014
2015 - def copy(self, src):
2016 self._check(pn_data_copy(self._data, src._data))
2017
2028
2030 pn_data_dump(self._data)
2031
2041
2043 if self.enter():
2044 try:
2045 result = {}
2046 while self.next():
2047 k = self.get_object()
2048 if self.next():
2049 v = self.get_object()
2050 else:
2051 v = None
2052 result[k] = v
2053 finally:
2054 self.exit()
2055 return result
2056
2065
2067 if self.enter():
2068 try:
2069 result = []
2070 while self.next():
2071 result.append(self.get_object())
2072 finally:
2073 self.exit()
2074 return result
2075
2086
2095
2097 """
2098 If the current node is an array, return an Array object
2099 representing the array and its contents. Otherwise return None.
2100 This is a convenience wrapper around get_array, enter, etc.
2101 """
2102
2103 count, described, type = self.get_array()
2104 if type is None: return None
2105 if self.enter():
2106 try:
2107 if described:
2108 self.next()
2109 descriptor = self.get_object()
2110 else:
2111 descriptor = UNDESCRIBED
2112 elements = []
2113 while self.next():
2114 elements.append(self.get_object())
2115 finally:
2116 self.exit()
2117 return Array(descriptor, type, *elements)
2118
2130
2131 put_mappings = {
2132 None.__class__: lambda s, _: s.put_null(),
2133 bool: put_bool,
2134 dict: put_dict,
2135 list: put_sequence,
2136 tuple: put_sequence,
2137 unicode: put_string,
2138 bytes: put_binary,
2139 symbol: put_symbol,
2140 int: put_long,
2141 char: put_char,
2142 long: put_long,
2143 ulong: put_ulong,
2144 timestamp: put_timestamp,
2145 float: put_double,
2146 uuid.UUID: put_uuid,
2147 Described: put_py_described,
2148 Array: put_py_array
2149 }
2150 get_mappings = {
2151 NULL: lambda s: None,
2152 BOOL: get_bool,
2153 BYTE: get_byte,
2154 UBYTE: get_ubyte,
2155 SHORT: get_short,
2156 USHORT: get_ushort,
2157 INT: get_int,
2158 UINT: get_uint,
2159 CHAR: get_char,
2160 LONG: get_long,
2161 ULONG: get_ulong,
2162 TIMESTAMP: get_timestamp,
2163 FLOAT: get_float,
2164 DOUBLE: get_double,
2165 DECIMAL32: get_decimal32,
2166 DECIMAL64: get_decimal64,
2167 DECIMAL128: get_decimal128,
2168 UUID: get_uuid,
2169 BINARY: get_binary,
2170 STRING: get_string,
2171 SYMBOL: get_symbol,
2172 DESCRIBED: get_py_described,
2173 ARRAY: get_py_array,
2174 LIST: get_sequence,
2175 MAP: get_dict
2176 }
2177
2178
2180 putter = self.put_mappings[obj.__class__]
2181 putter(self, obj)
2182
2184 type = self.type()
2185 if type is None: return None
2186 getter = self.get_mappings.get(type)
2187 if getter:
2188 return getter(self)
2189 else:
2190 return UnmappedType(str(type))
2191
2194
2248
2250
2251 - def __init__(self, name, description=None, info=None):
2252 self.name = name
2253 self.description = description
2254 self.info = info
2255
2257 return "Condition(%s)" % ", ".join([repr(x) for x in
2258 (self.name, self.description, self.info)
2259 if x])
2260
2262 if not isinstance(o, Condition): return False
2263 return self.name == o.name and \
2264 self.description == o.description and \
2265 self.info == o.info
2266
2268 pn_condition_clear(cond)
2269 if obj:
2270 pn_condition_set_name(cond, str(obj.name))
2271 pn_condition_set_description(cond, obj.description)
2272 info = Data(pn_condition_info(cond))
2273 if obj.info:
2274 info.put_object(obj.info)
2275
2277 if pn_condition_is_set(cond):
2278 return Condition(pn_condition_get_name(cond),
2279 pn_condition_get_description(cond),
2280 dat2obj(pn_condition_info(cond)))
2281 else:
2282 return None
2283
2292
2297
2299 return long(secs*1000)
2300
2302 return float(millis)/1000.0
2303
2305 if secs is None: return PN_MILLIS_MAX
2306 return secs2millis(secs)
2307
2309 if millis == PN_MILLIS_MAX: return None
2310 return millis2secs(millis)
2311
2313 if string is None:
2314 return None
2315 if isinstance(string, unicode):
2316 return string.encode('utf8')
2317 elif isinstance(string, str):
2318 return string
2319 else:
2320 raise TypeError("Unrecognized string type: %r" % string)
2321
2323 if string is None:
2324 return None
2325 if isinstance(string, unicode):
2326 return string
2327 elif isinstance(string, str):
2328 return string.decode('utf8')
2329 else:
2330 raise TypeError("Unrecognized string type")
2331
2333 """
2334 A representation of an AMQP connection
2335 """
2336
2337 @staticmethod
2339 if impl is None:
2340 return None
2341 else:
2342 return Connection(impl)
2343
2344 - def __init__(self, impl = pn_connection):
2346
2348 Endpoint._init(self)
2349 self.offered_capabilities = None
2350 self.desired_capabilities = None
2351 self.properties = None
2352
2354 return pn_connection_attachments(self._impl)
2355
2356 @property
2359
2360 @property
2363
2365 if err < 0:
2366 exc = EXCEPTIONS.get(err, ConnectionException)
2367 raise exc("[%s]: %s" % (err, pn_connection_error(self._impl)))
2368 else:
2369 return err
2370
2372 return pn_connection_condition(self._impl)
2373
2375 return pn_connection_remote_condition(self._impl)
2376
2378 if collector is None:
2379 pn_connection_collect(self._impl, None)
2380 else:
2381 pn_connection_collect(self._impl, collector._impl)
2382 self._collector = weakref.ref(collector)
2383
2385 return utf82unicode(pn_connection_get_container(self._impl))
2387 return pn_connection_set_container(self._impl, unicode2utf8(name))
2388
2389 container = property(_get_container, _set_container)
2390
2392 return utf82unicode(pn_connection_get_hostname(self._impl))
2394 return pn_connection_set_hostname(self._impl, unicode2utf8(name))
2395
2396 hostname = property(_get_hostname, _set_hostname)
2397
2398 @property
2400 """The container identifier specified by the remote peer for this connection."""
2401 return pn_connection_remote_container(self._impl)
2402
2403 @property
2405 """The hostname specified by the remote peer for this connection."""
2406 return pn_connection_remote_hostname(self._impl)
2407
2408 @property
2410 """The capabilities offered by the remote peer for this connection."""
2411 return dat2obj(pn_connection_remote_offered_capabilities(self._impl))
2412
2413 @property
2415 """The capabilities desired by the remote peer for this connection."""
2416 return dat2obj(pn_connection_remote_desired_capabilities(self._impl))
2417
2418 @property
2420 """The properties specified by the remote peer for this connection."""
2421 return dat2obj(pn_connection_remote_properties(self._impl))
2422
2424 """
2425 Opens the connection.
2426
2427 In more detail, this moves the local state of the connection to
2428 the ACTIVE state and triggers an open frame to be sent to the
2429 peer. A connection is fully active once both peers have opened it.
2430 """
2431 obj2dat(self.offered_capabilities,
2432 pn_connection_offered_capabilities(self._impl))
2433 obj2dat(self.desired_capabilities,
2434 pn_connection_desired_capabilities(self._impl))
2435 obj2dat(self.properties, pn_connection_properties(self._impl))
2436 pn_connection_open(self._impl)
2437
2439 """
2440 Closes the connection.
2441
2442 In more detail, this moves the local state of the connection to
2443 the CLOSED state and triggers a close frame to be sent to the
2444 peer. A connection is fully closed once both peers have closed it.
2445 """
2446 self._update_cond()
2447 pn_connection_close(self._impl)
2448
2449 @property
2451 """
2452 The state of the connection as a bit field. The state has a local
2453 and a remote component. Each of these can be in one of three
2454 states: UNINIT, ACTIVE or CLOSED. These can be tested by masking
2455 against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT,
2456 REMOTE_ACTIVE and REMOTE_CLOSED.
2457 """
2458 return pn_connection_state(self._impl)
2459
2461 """
2462 Returns a new session on this connection.
2463 """
2464 return Session(pn_session(self._impl))
2465
2467 return Session.wrap(pn_session_head(self._impl, mask))
2468
2470 return Link.wrap(pn_link_head(self._impl, mask))
2471
2472 @property
2475
2476 @property
2478 return pn_error_code(pn_connection_error(self._impl))
2479
2481 pn_connection_release(self._impl)
2482
2485
2487
2488 @staticmethod
2490 if impl is None:
2491 return None
2492 else:
2493 return Session(impl)
2494
2497
2499 return pn_session_attachments(self._impl)
2500
2502 return pn_session_condition(self._impl)
2503
2505 return pn_session_remote_condition(self._impl)
2506
2508 return pn_session_get_incoming_capacity(self._impl)
2509
2511 pn_session_set_incoming_capacity(self._impl, capacity)
2512
2513 incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity)
2514
2515 @property
2517 return pn_session_outgoing_bytes(self._impl)
2518
2519 @property
2521 return pn_session_incoming_bytes(self._impl)
2522
2524 pn_session_open(self._impl)
2525
2527 self._update_cond()
2528 pn_session_close(self._impl)
2529
2530 - def next(self, mask):
2531 return Session.wrap(pn_session_next(self._impl, mask))
2532
2533 @property
2535 return pn_session_state(self._impl)
2536
2537 @property
2540
2542 return Sender(pn_sender(self._impl, unicode2utf8(name)))
2543
2545 return Receiver(pn_receiver(self._impl, unicode2utf8(name)))
2546
2548 pn_session_free(self._impl)
2549
2552
2553 -class Link(Wrapper, Endpoint):
2554 """
2555 A representation of an AMQP link, of which there are two concrete
2556 implementations, Sender and Receiver.
2557 """
2558
2559 SND_UNSETTLED = PN_SND_UNSETTLED
2560 SND_SETTLED = PN_SND_SETTLED
2561 SND_MIXED = PN_SND_MIXED
2562
2563 RCV_FIRST = PN_RCV_FIRST
2564 RCV_SECOND = PN_RCV_SECOND
2565
2566 @staticmethod
2568 if impl is None: return None
2569 if pn_link_is_sender(impl):
2570 return Sender(impl)
2571 else:
2572 return Receiver(impl)
2573
2576
2578 return pn_link_attachments(self._impl)
2579
2581 if err < 0:
2582 exc = EXCEPTIONS.get(err, LinkException)
2583 raise exc("[%s]: %s" % (err, pn_error_text(pn_link_error(self._impl))))
2584 else:
2585 return err
2586
2588 return pn_link_condition(self._impl)
2589
2591 return pn_link_remote_condition(self._impl)
2592
2594 """
2595 Opens the link.
2596
2597 In more detail, this moves the local state of the link to the
2598 ACTIVE state and triggers an attach frame to be sent to the
2599 peer. A link is fully active once both peers have attached it.
2600 """
2601 pn_link_open(self._impl)
2602
2604 """
2605 Closes the link.
2606
2607 In more detail, this moves the local state of the link to the
2608 CLOSED state and triggers an detach frame (with the closed flag
2609 set) to be sent to the peer. A link is fully closed once both
2610 peers have detached it.
2611 """
2612 self._update_cond()
2613 pn_link_close(self._impl)
2614
2615 @property
2617 """
2618 The state of the link as a bit field. The state has a local
2619 and a remote component. Each of these can be in one of three
2620 states: UNINIT, ACTIVE or CLOSED. These can be tested by masking
2621 against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT,
2622 REMOTE_ACTIVE and REMOTE_CLOSED.
2623 """
2624 return pn_link_state(self._impl)
2625
2626 @property
2628 """The source of the link as described by the local peer."""
2629 return Terminus(pn_link_source(self._impl))
2630
2631 @property
2633 """The target of the link as described by the local peer."""
2634 return Terminus(pn_link_target(self._impl))
2635
2636 @property
2638 """The source of the link as described by the remote peer."""
2639 return Terminus(pn_link_remote_source(self._impl))
2640 @property
2642 """The target of the link as described by the remote peer."""
2643 return Terminus(pn_link_remote_target(self._impl))
2644
2645 @property
2648
2649 @property
2651 """The connection on which this link was attached."""
2652 return self.session.connection
2653
2656
2657 @property
2660
2662 return pn_link_advance(self._impl)
2663
2664 @property
2666 return pn_link_unsettled(self._impl)
2667
2668 @property
2670 """The amount of oustanding credit on this link."""
2671 return pn_link_credit(self._impl)
2672
2673 @property
2675 return pn_link_available(self._impl)
2676
2677 @property
2679 return pn_link_queued(self._impl)
2680
2681 - def next(self, mask):
2682 return Link.wrap(pn_link_next(self._impl, mask))
2683
2684 @property
2686 return utf82unicode(pn_link_name(self._impl))
2687
2688 @property
2690 """Returns true if this link is a sender."""
2691 return pn_link_is_sender(self._impl)
2692
2693 @property
2695 """Returns true if this link is a receiver."""
2696 return pn_link_is_receiver(self._impl)
2697
2698 @property
2700 return pn_link_remote_snd_settle_mode(self._impl)
2701
2702 @property
2704 return pn_link_remote_rcv_settle_mode(self._impl)
2705
2707 return pn_link_snd_settle_mode(self._impl)
2709 pn_link_set_snd_settle_mode(self._impl, mode)
2710 snd_settle_mode = property(_get_snd_settle_mode, _set_snd_settle_mode)
2711
2713 return pn_link_rcv_settle_mode(self._impl)
2715 pn_link_set_rcv_settle_mode(self._impl, mode)
2716 rcv_settle_mode = property(_get_rcv_settle_mode, _set_rcv_settle_mode)
2717
2719 return pn_link_get_drain(self._impl)
2720
2722 pn_link_set_drain(self._impl, bool(b))
2723
2724 drain_mode = property(_get_drain, _set_drain)
2725
2727 return pn_link_drained(self._impl)
2728
2730 return pn_link_detach(self._impl)
2731
2733 pn_link_free(self._impl)
2734
2736
2737 UNSPECIFIED = PN_UNSPECIFIED
2738 SOURCE = PN_SOURCE
2739 TARGET = PN_TARGET
2740 COORDINATOR = PN_COORDINATOR
2741
2742 NONDURABLE = PN_NONDURABLE
2743 CONFIGURATION = PN_CONFIGURATION
2744 DELIVERIES = PN_DELIVERIES
2745
2746 DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED
2747 DIST_MODE_COPY = PN_DIST_MODE_COPY
2748 DIST_MODE_MOVE = PN_DIST_MODE_MOVE
2749
2752
2754 if err < 0:
2755 exc = EXCEPTIONS.get(err, LinkException)
2756 raise exc("[%s]" % err)
2757 else:
2758 return err
2759
2761 return pn_terminus_get_type(self._impl)
2763 self._check(pn_terminus_set_type(self._impl, type))
2764 type = property(_get_type, _set_type)
2765
2767 return utf82unicode(pn_terminus_get_address(self._impl))
2769 self._check(pn_terminus_set_address(self._impl, unicode2utf8(address)))
2770 address = property(_get_address, _set_address)
2771
2773 return pn_terminus_get_durability(self._impl)
2775 self._check(pn_terminus_set_durability(self._impl, seconds))
2776 durability = property(_get_durability, _set_durability)
2777
2779 return pn_terminus_get_expiry_policy(self._impl)
2781 self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
2782 expiry_policy = property(_get_expiry_policy, _set_expiry_policy)
2783
2785 return pn_terminus_get_timeout(self._impl)
2787 self._check(pn_terminus_set_timeout(self._impl, seconds))
2788 timeout = property(_get_timeout, _set_timeout)
2789
2791 return pn_terminus_is_dynamic(self._impl)
2793 self._check(pn_terminus_set_dynamic(self._impl, dynamic))
2794 dynamic = property(_is_dynamic, _set_dynamic)
2795
2797 return pn_terminus_get_distribution_mode(self._impl)
2799 self._check(pn_terminus_set_distribution_mode(self._impl, mode))
2800 distribution_mode = property(_get_distribution_mode, _set_distribution_mode)
2801
2802 @property
2804 return Data(pn_terminus_properties(self._impl))
2805
2806 @property
2808 return Data(pn_terminus_capabilities(self._impl))
2809
2810 @property
2812 return Data(pn_terminus_outcomes(self._impl))
2813
2814 @property
2816 return Data(pn_terminus_filter(self._impl))
2817
2818 - def copy(self, src):
2819 self._check(pn_terminus_copy(self._impl, src._impl))
2820
2822 """
2823 A link over which messages are sent.
2824 """
2825
2827 pn_link_offered(self._impl, n)
2828
2830 """
2831 Send specified bytes as part of the current delivery
2832 """
2833 return self._check(pn_link_send(self._impl, bytes))
2834
2835 - def send(self, obj, tag=None):
2836 """
2837 Send specified object over this sender; the object is expected to
2838 have a send() method on it that takes the sender and an optional
2839 tag as arguments.
2840
2841 Where the object is a Message, this will send the message over
2842 this link, creating a new delivery for the purpose.
2843 """
2844 if hasattr(obj, 'send'):
2845 return obj.send(self, tag=tag)
2846 else:
2847
2848 return self.stream(obj)
2849
2851 if not hasattr(self, 'tag_generator'):
2852 def simple_tags():
2853 count = 1
2854 while True:
2855 yield str(count)
2856 count += 1
2857 self.tag_generator = simple_tags()
2858 return self.tag_generator.next()
2859
2861 """
2862 A link over which messages are received.
2863 """
2864
2865 - def flow(self, n):
2866 """Increases the credit issued to the remote sender by the specified number of messages."""
2867 pn_link_flow(self._impl, n)
2868
2869 - def recv(self, limit):
2870 n, bytes = pn_link_recv(self._impl, limit)
2871 if n == PN_EOS:
2872 return None
2873 else:
2874 self._check(n)
2875 return bytes
2876
2878 pn_link_drain(self._impl, n)
2879
2881 return pn_link_draining(self._impl)
2882
2884
2885 values = {}
2886
2888 ni = super(NamedInt, cls).__new__(cls, i)
2889 cls.values[i] = ni
2890 return ni
2891
2894
2897
2900
2901 @classmethod
2903 return cls.values.get(i, i)
2904
2907
2909
2910 RECEIVED = DispositionType(PN_RECEIVED, "RECEIVED")
2911 ACCEPTED = DispositionType(PN_ACCEPTED, "ACCEPTED")
2912 REJECTED = DispositionType(PN_REJECTED, "REJECTED")
2913 RELEASED = DispositionType(PN_RELEASED, "RELEASED")
2914 MODIFIED = DispositionType(PN_MODIFIED, "MODIFIED")
2915
2917 self._impl = impl
2918 self.local = local
2919 self._data = None
2920 self._condition = None
2921 self._annotations = None
2922
2923 @property
2925 return DispositionType.get(pn_disposition_type(self._impl))
2926
2928 return pn_disposition_get_section_number(self._impl)
2930 pn_disposition_set_section_number(self._impl, n)
2931 section_number = property(_get_section_number, _set_section_number)
2932
2934 return pn_disposition_get_section_offset(self._impl)
2936 pn_disposition_set_section_offset(self._impl, n)
2937 section_offset = property(_get_section_offset, _set_section_offset)
2938
2940 return pn_disposition_is_failed(self._impl)
2942 pn_disposition_set_failed(self._impl, b)
2943 failed = property(_get_failed, _set_failed)
2944
2946 return pn_disposition_is_undeliverable(self._impl)
2948 pn_disposition_set_undeliverable(self._impl, b)
2949 undeliverable = property(_get_undeliverable, _set_undeliverable)
2950
2952 if self.local:
2953 return self._data
2954 else:
2955 return dat2obj(pn_disposition_data(self._impl))
2957 if self.local:
2958 self._data = obj
2959 else:
2960 raise AttributeError("data attribute is read-only")
2961 data = property(_get_data, _set_data)
2962
2964 if self.local:
2965 return self._annotations
2966 else:
2967 return dat2obj(pn_disposition_annotations(self._impl))
2969 if self.local:
2970 self._annotations = obj
2971 else:
2972 raise AttributeError("annotations attribute is read-only")
2973 annotations = property(_get_annotations, _set_annotations)
2974
2976 if self.local:
2977 return self._condition
2978 else:
2979 return cond2obj(pn_disposition_condition(self._impl))
2981 if self.local:
2982 self._condition = obj
2983 else:
2984 raise AttributeError("condition attribute is read-only")
2985 condition = property(_get_condition, _set_condition)
2986
2988 """
2989 Tracks and/or records the delivery of a message over a link.
2990 """
2991
2992 RECEIVED = Disposition.RECEIVED
2993 ACCEPTED = Disposition.ACCEPTED
2994 REJECTED = Disposition.REJECTED
2995 RELEASED = Disposition.RELEASED
2996 MODIFIED = Disposition.MODIFIED
2997
2998 @staticmethod
3000 if impl is None:
3001 return None
3002 else:
3003 return Delivery(impl)
3004
3007
3009 self.local = Disposition(pn_delivery_local(self._impl), True)
3010 self.remote = Disposition(pn_delivery_remote(self._impl), False)
3011
3012 @property
3014 """The identifier for the delivery."""
3015 return pn_delivery_tag(self._impl)
3016
3017 @property
3019 """Returns true for an outgoing delivery to which data can now be written."""
3020 return pn_delivery_writable(self._impl)
3021
3022 @property
3024 """Returns true for an incoming delivery that has data to read."""
3025 return pn_delivery_readable(self._impl)
3026
3027 @property
3029 """Returns true if the state of the delivery has been updated
3030 (e.g. it has been settled and/or accepted, rejected etc)."""
3031 return pn_delivery_updated(self._impl)
3032
3034 """
3035 Set the local state of the delivery e.g. ACCEPTED, REJECTED, RELEASED.
3036 """
3037 obj2dat(self.local._data, pn_disposition_data(self.local._impl))
3038 obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl))
3039 obj2cond(self.local._condition, pn_disposition_condition(self.local._impl))
3040 pn_delivery_update(self._impl, state)
3041
3042 @property
3044 return pn_delivery_pending(self._impl)
3045
3046 @property
3048 """
3049 Returns true for an incoming delivery if not all the data is
3050 yet available.
3051 """
3052 return pn_delivery_partial(self._impl)
3053
3054 @property
3056 """Returns the local state of the delivery."""
3057 return DispositionType.get(pn_delivery_local_state(self._impl))
3058
3059 @property
3061 """
3062 Returns the state of the delivery as indicated by the remote
3063 peer.
3064 """
3065 return DispositionType.get(pn_delivery_remote_state(self._impl))
3066
3067 @property
3069 """
3070 Returns true if the delivery has been settled by the remote peer.
3071 """
3072 return pn_delivery_settled(self._impl)
3073
3075 """
3076 Settles the delivery locally. This indicates the aplication
3077 considers the delivery complete and does not wish to receive any
3078 further events about it. Every delivery should be settled locally.
3079 """
3080 pn_delivery_settle(self._impl)
3081
3082 @property
3085
3086 @property
3088 """
3089 Returns the link on which the delivery was sent or received.
3090 """
3091 return Link.wrap(pn_delivery_link(self._impl))
3092
3093 @property
3095 """
3096 Returns the session over which the delivery was sent or received.
3097 """
3098 return self.link.session
3099
3100 @property
3102 """
3103 Returns the connection over which the delivery was sent or received.
3104 """
3105 return self.session.connection
3106
3107 @property
3110
3113
3115
3116 TRACE_OFF = PN_TRACE_OFF
3117 TRACE_DRV = PN_TRACE_DRV
3118 TRACE_FRM = PN_TRACE_FRM
3119 TRACE_RAW = PN_TRACE_RAW
3120
3121 CLIENT = 1
3122 SERVER = 2
3123
3124 @staticmethod
3126 if impl is None:
3127 return None
3128 else:
3129 return Transport(_impl=impl)
3130
3131 - def __init__(self, mode=None, _impl = pn_transport):
3139
3141 self._sasl = None
3142 self._ssl = None
3143
3145 if err < 0:
3146 exc = EXCEPTIONS.get(err, TransportException)
3147 raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._impl))))
3148 else:
3149 return err
3150
3151 - def bind(self, connection):
3152 """Assign a connection to the transport"""
3153 self._check(pn_transport_bind(self._impl, connection._impl))
3154
3156 """Release the connection"""
3157 self._check(pn_transport_unbind(self._impl))
3158
3160 pn_transport_trace(self._impl, n)
3161
3162 - def tick(self, now):
3163 """Process any timed events (like heartbeat generation).
3164 now = seconds since epoch (float).
3165 """
3166 return millis2secs(pn_transport_tick(self._impl, secs2millis(now)))
3167
3169 c = pn_transport_capacity(self._impl)
3170 if c >= PN_EOS:
3171 return c
3172 else:
3173 return self._check(c)
3174
3175 - def push(self, bytes):
3176 n = self._check(pn_transport_push(self._impl, bytes))
3177 if n != len(bytes):
3178 raise OverflowError("unable to process all bytes")
3179
3181 self._check(pn_transport_close_tail(self._impl))
3182
3184 p = pn_transport_pending(self._impl)
3185 if p >= PN_EOS:
3186 return p
3187 else:
3188 return self._check(p)
3189
3190 - def peek(self, size):
3191 cd, out = pn_transport_peek(self._impl, size)
3192 if cd == PN_EOS:
3193 return None
3194 else:
3195 self._check(cd)
3196 return out
3197
3198 - def pop(self, size):
3199 pn_transport_pop(self._impl, size)
3200
3202 self._check(pn_transport_close_head(self._impl))
3203
3204 @property
3206 return pn_transport_closed(self._impl)
3207
3208
3210 return pn_transport_get_max_frame(self._impl)
3211
3213 pn_transport_set_max_frame(self._impl, value)
3214
3215 max_frame_size = property(_get_max_frame_size, _set_max_frame_size,
3216 doc="""
3217 Sets the maximum size for received frames (in bytes).
3218 """)
3219
3220 @property
3222 return pn_transport_get_remote_max_frame(self._impl)
3223
3225 return pn_transport_get_channel_max(self._impl)
3226
3228 pn_transport_set_channel_max(self._impl, value)
3229
3230 channel_max = property(_get_channel_max, _set_channel_max,
3231 doc="""
3232 Sets the maximum channel that may be used on the transport.
3233 """)
3234
3235 @property
3237 return pn_transport_remote_channel_max(self._impl)
3238
3239
3241 return millis2secs(pn_transport_get_idle_timeout(self._impl))
3242
3244 pn_transport_set_idle_timeout(self._impl, secs2millis(sec))
3245
3246 idle_timeout = property(_get_idle_timeout, _set_idle_timeout,
3247 doc="""
3248 The idle timeout of the connection (float, in seconds).
3249 """)
3250
3251 @property
3253 return millis2secs(pn_transport_get_remote_idle_timeout(self._impl))
3254
3255 @property
3257 return pn_transport_get_frames_output(self._impl)
3258
3259 @property
3262
3265
3266 - def ssl(self, domain=None, session_details=None):
3267
3268 if not self._ssl:
3269 self._ssl = SSL(self, domain, session_details)
3270 return self._ssl
3271
3272 @property
3274 return cond2obj(pn_transport_condition(self._impl))
3275
3276 @property
3279
3282
3283 -class SASL(Wrapper):
3284
3285 OK = PN_SASL_OK
3286 AUTH = PN_SASL_AUTH
3287 SKIPPED = PN_SASL_SKIPPED
3288
3292
3294 if err < 0:
3295 exc = EXCEPTIONS.get(err, SASLException)
3296 raise exc("[%s]" % (err))
3297 else:
3298 return err
3299
3301 pn_sasl_mechanisms(self._sasl, mechs)
3302
3303
3305 pn_sasl_client(self._sasl)
3306
3307
3309 pn_sasl_server(self._sasl)
3310
3312 pn_sasl_allow_skip(self._sasl, allow)
3313
3314 - def plain(self, user, password):
3315 pn_sasl_plain(self._sasl, user, password)
3316
3317 - def send(self, data):
3318 self._check(pn_sasl_send(self._sasl, data, len(data)))
3319
3321 sz = 16
3322 while True:
3323 n, data = pn_sasl_recv(self._sasl, sz)
3324 if n == PN_OVERFLOW:
3325 sz *= 2
3326 continue
3327 elif n == PN_EOS:
3328 return None
3329 else:
3330 self._check(n)
3331 return data
3332
3333 @property
3335 outcome = pn_sasl_outcome(self._sasl)
3336 if outcome == PN_SASL_NONE:
3337 return None
3338 else:
3339 return outcome
3340
3341 - def done(self, outcome):
3342 pn_sasl_done(self._sasl, outcome)
3343
3344 STATE_IDLE = PN_SASL_IDLE
3345 STATE_STEP = PN_SASL_STEP
3346 STATE_PASS = PN_SASL_PASS
3347 STATE_FAIL = PN_SASL_FAIL
3348
3349 @property
3351 return pn_sasl_state(self._sasl)
3352
3356
3359
3360 -class SSLDomain(object):
3361
3362 MODE_CLIENT = PN_SSL_MODE_CLIENT
3363 MODE_SERVER = PN_SSL_MODE_SERVER
3364 VERIFY_PEER = PN_SSL_VERIFY_PEER
3365 VERIFY_PEER_NAME = PN_SSL_VERIFY_PEER_NAME
3366 ANONYMOUS_PEER = PN_SSL_ANONYMOUS_PEER
3367
3368 - def __init__(self, mode):
3369 self._domain = pn_ssl_domain(mode)
3370 if self._domain is None:
3371 raise SSLUnavailable()
3372
3373 - def _check(self, err):
3374 if err < 0:
3375 exc = EXCEPTIONS.get(err, SSLException)
3376 raise exc("SSL failure.")
3377 else:
3378 return err
3379
3380 - def set_credentials(self, cert_file, key_file, password):
3381 return self._check( pn_ssl_domain_set_credentials(self._domain,
3382 cert_file, key_file,
3383 password) )
3384 - def set_trusted_ca_db(self, certificate_db):
3385 return self._check( pn_ssl_domain_set_trusted_ca_db(self._domain,
3386 certificate_db) )
3387 - def set_peer_authentication(self, verify_mode, trusted_CAs=None):
3388 return self._check( pn_ssl_domain_set_peer_authentication(self._domain,
3389 verify_mode,
3390 trusted_CAs) )
3391
3393 return self._check( pn_ssl_domain_allow_unsecured_client(self._domain) )
3394
3395 - def __del__(self):
3396 pn_ssl_domain_free(self._domain)
3397
3399
3400 @staticmethod
3402 return pn_ssl_present()
3403
3410
3411 - def __new__(cls, transport, domain, session_details=None):
3412 """Enforce a singleton SSL object per Transport"""
3413 if transport._ssl:
3414
3415
3416
3417 ssl = transport._ssl
3418 if (domain and (ssl._domain is not domain) or
3419 session_details and (ssl._session_details is not session_details)):
3420 raise SSLException("Cannot re-configure existing SSL object!")
3421 else:
3422 obj = super(SSL, cls).__new__(cls)
3423 obj._domain = domain
3424 obj._session_details = session_details
3425 session_id = None
3426 if session_details:
3427 session_id = session_details.get_session_id()
3428 obj._ssl = pn_ssl( transport._impl )
3429 if obj._ssl is None:
3430 raise SSLUnavailable()
3431 pn_ssl_init( obj._ssl, domain._domain, session_id )
3432 transport._ssl = obj
3433 return transport._ssl
3434
3436 rc, name = pn_ssl_get_cipher_name( self._ssl, 128 )
3437 if rc:
3438 return name
3439 return None
3440
3442 rc, name = pn_ssl_get_protocol_name( self._ssl, 128 )
3443 if rc:
3444 return name
3445 return None
3446
3447 RESUME_UNKNOWN = PN_SSL_RESUME_UNKNOWN
3448 RESUME_NEW = PN_SSL_RESUME_NEW
3449 RESUME_REUSED = PN_SSL_RESUME_REUSED
3450
3452 return pn_ssl_resume_status( self._ssl )
3453
3455 self._check(pn_ssl_set_peer_hostname( self._ssl, unicode2utf8(hostname) ))
3457 err, name = pn_ssl_get_peer_hostname( self._ssl, 1024 )
3458 self._check(err)
3459 return utf82unicode(name)
3460 peer_hostname = property(_get_peer_hostname, _set_peer_hostname,
3461 doc="""
3462 Manage the expected name of the remote peer. Used to authenticate the remote.
3463 """)
3464
3467 """ Unique identifier for the SSL session. Used to resume previous session on a new
3468 SSL connection.
3469 """
3470
3472 self._session_id = session_id
3473
3475 return self._session_id
3476
3477
3478 wrappers = {
3479 "pn_void": lambda x: pn_void2py(x),
3480 "pn_pyref": lambda x: pn_void2py(x),
3481 "pn_connection": lambda x: Connection.wrap(pn_cast_pn_connection(x)),
3482 "pn_session": lambda x: Session.wrap(pn_cast_pn_session(x)),
3483 "pn_link": lambda x: Link.wrap(pn_cast_pn_link(x)),
3484 "pn_delivery": lambda x: Delivery.wrap(pn_cast_pn_delivery(x)),
3485 "pn_transport": lambda x: Transport.wrap(pn_cast_pn_transport(x)),
3486 "pn_selectable": lambda x: Selectable.wrap(pn_cast_pn_selectable(x))
3487 }
3490
3492 self._impl = pn_collector()
3493
3494 - def put(self, obj, etype):
3495 pn_collector_put(self._impl, PN_PYREF, pn_py2void(obj), etype.number)
3496
3498 return Event.wrap(pn_collector_peek(self._impl))
3499
3501 ev = self.peek()
3502 pn_collector_pop(self._impl)
3503
3505 pn_collector_free(self._impl)
3506 del self._impl
3507
3509
3510 _lock = threading.Lock()
3511 _extended = 10000
3512 TYPES = {}
3513
3514 - def __init__(self, name=None, number=None, method=None):
3515 if name is None and number is None:
3516 raise TypeError("extended events require a name")
3517 try:
3518 self._lock.acquire()
3519 if name is None:
3520 name = pn_event_type_name(number)
3521
3522 if number is None:
3523 number = EventType._extended
3524 EventType._extended += 1
3525
3526 if method is None:
3527 method = "on_%s" % name
3528
3529 self.name = name
3530 self.number = number
3531 self.method = method
3532
3533 self.TYPES[number] = self
3534 finally:
3535 self._lock.release()
3536
3539
3546
3548
3549 - def __init__(self, clazz, context, type):
3553
3556
3557 -def _none(x): return None
3558
3559 DELEGATED = Constant("DELEGATED")
3560
3561 -def _core(number, method):
3562 return EventType(number=number, method=method)
3563
3564 -class Event(Wrapper, EventBase):
3565
3566 REACTOR_INIT = _core(PN_REACTOR_INIT, "on_reactor_init")
3567 REACTOR_QUIESCED = _core(PN_REACTOR_QUIESCED, "on_reactor_quiesced")
3568 REACTOR_FINAL = _core(PN_REACTOR_FINAL, "on_reactor_final")
3569
3570 TIMER_TASK = _core(PN_TIMER_TASK, "on_timer_task")
3571
3572 CONNECTION_INIT = _core(PN_CONNECTION_INIT, "on_connection_init")
3573 CONNECTION_BOUND = _core(PN_CONNECTION_BOUND, "on_connection_bound")
3574 CONNECTION_UNBOUND = _core(PN_CONNECTION_UNBOUND, "on_connection_unbound")
3575 CONNECTION_LOCAL_OPEN = _core(PN_CONNECTION_LOCAL_OPEN, "on_connection_local_open")
3576 CONNECTION_LOCAL_CLOSE = _core(PN_CONNECTION_LOCAL_CLOSE, "on_connection_local_close")
3577 CONNECTION_REMOTE_OPEN = _core(PN_CONNECTION_REMOTE_OPEN, "on_connection_remote_open")
3578 CONNECTION_REMOTE_CLOSE = _core(PN_CONNECTION_REMOTE_CLOSE, "on_connection_remote_close")
3579 CONNECTION_FINAL = _core(PN_CONNECTION_FINAL, "on_connection_final")
3580
3581 SESSION_INIT = _core(PN_SESSION_INIT, "on_session_init")
3582 SESSION_LOCAL_OPEN = _core(PN_SESSION_LOCAL_OPEN, "on_session_local_open")
3583 SESSION_LOCAL_CLOSE = _core(PN_SESSION_LOCAL_CLOSE, "on_session_local_close")
3584 SESSION_REMOTE_OPEN = _core(PN_SESSION_REMOTE_OPEN, "on_session_remote_open")
3585 SESSION_REMOTE_CLOSE = _core(PN_SESSION_REMOTE_CLOSE, "on_session_remote_close")
3586 SESSION_FINAL = _core(PN_SESSION_FINAL, "on_session_final")
3587
3588 LINK_INIT = _core(PN_LINK_INIT, "on_link_init")
3589 LINK_LOCAL_OPEN = _core(PN_LINK_LOCAL_OPEN, "on_link_local_open")
3590 LINK_LOCAL_CLOSE = _core(PN_LINK_LOCAL_CLOSE, "on_link_local_close")
3591 LINK_LOCAL_DETACH = _core(PN_LINK_LOCAL_DETACH, "on_link_local_detach")
3592 LINK_REMOTE_OPEN = _core(PN_LINK_REMOTE_OPEN, "on_link_remote_open")
3593 LINK_REMOTE_CLOSE = _core(PN_LINK_REMOTE_CLOSE, "on_link_remote_close")
3594 LINK_REMOTE_DETACH = _core(PN_LINK_REMOTE_DETACH, "on_link_remote_detach")
3595 LINK_FLOW = _core(PN_LINK_FLOW, "on_link_flow")
3596 LINK_FINAL = _core(PN_LINK_FINAL, "on_link_final")
3597
3598 DELIVERY = _core(PN_DELIVERY, "on_delivery")
3599
3600 TRANSPORT = _core(PN_TRANSPORT, "on_transport")
3601 TRANSPORT_ERROR = _core(PN_TRANSPORT_ERROR, "on_transport_error")
3602 TRANSPORT_HEAD_CLOSED = _core(PN_TRANSPORT_HEAD_CLOSED, "on_transport_head_closed")
3603 TRANSPORT_TAIL_CLOSED = _core(PN_TRANSPORT_TAIL_CLOSED, "on_transport_tail_closed")
3604 TRANSPORT_CLOSED = _core(PN_TRANSPORT_CLOSED, "on_transport_closed")
3605
3606 SELECTABLE_INIT = _core(PN_SELECTABLE_INIT, "on_selectable_init")
3607 SELECTABLE_UPDATED = _core(PN_SELECTABLE_UPDATED, "on_selectable_updated")
3608 SELECTABLE_READABLE = _core(PN_SELECTABLE_READABLE, "on_selectable_readable")
3609 SELECTABLE_WRITABLE = _core(PN_SELECTABLE_WRITABLE, "on_selectable_writable")
3610 SELECTABLE_EXPIRED = _core(PN_SELECTABLE_EXPIRED, "on_selectable_expired")
3611 SELECTABLE_ERROR = _core(PN_SELECTABLE_ERROR, "on_selectable_error")
3612 SELECTABLE_FINAL = _core(PN_SELECTABLE_FINAL, "on_selectable_final")
3613
3614 @staticmethod
3615 - def wrap(impl, number=None):
3616 if impl is None:
3617 return None
3618
3619 if number is None:
3620 number = pn_event_type(impl)
3621
3622 event = Event(impl, number)
3623
3624 if isinstance(event.context, EventBase):
3625 return event.context
3626 else:
3627 return event
3628
3630 Wrapper.__init__(self, impl, pn_event_attachments)
3631 self.__dict__["type"] = EventType.TYPES[number]
3632
3635
3636 @property
3638 cls = pn_event_class(self._impl)
3639 if cls:
3640 return pn_class_name(cls)
3641 else:
3642 return None
3643
3644 @property
3645 - def context(self):
3646 """Returns the context object associated with the event. The type of this depend on the type of event."""
3647 return wrappers[self.clazz](pn_event_context(self._impl))
3648
3649 - def dispatch(self, handler, type=None):
3658
3659
3660 @property
3662 """Returns the reactor associated with the event."""
3663 return wrappers.get("pn_reactor", _none)(pn_event_reactor(self._impl))
3664
3665 @property
3667 """Returns the transport associated with the event, or null if none is associated with it."""
3668 return Transport.wrap(pn_event_transport(self._impl))
3669
3670 @property
3672 """Returns the connection associated with the event, or null if none is associated with it."""
3673 return Connection.wrap(pn_event_connection(self._impl))
3674
3675 @property
3677 """Returns the session associated with the event, or null if none is associated with it."""
3678 return Session.wrap(pn_event_session(self._impl))
3679
3680 @property
3682 """Returns the link associated with the event, or null if none is associated with it."""
3683 return Link.wrap(pn_event_link(self._impl))
3684
3685 @property
3687 """Returns the sender link associated with the event, or null if
3688 none is associated with it. This is essentially an alias for
3689 link(), that does an additional checkon the type of the
3690 link."""
3691 l = self.link
3692 if l and l.is_sender:
3693 return l
3694 else:
3695 return None
3696
3697 @property
3699 """Returns the receiver link associated with the event, or null if
3700 none is associated with it. This is essentially an alias for
3701 link(), that does an additional checkon the type of the link."""
3702 l = self.link
3703 if l and l.is_receiver:
3704 return l
3705 else:
3706 return None
3707
3708 @property
3710 """Returns the delivery associated with the event, or null if none is associated with it."""
3711 return Delivery.wrap(pn_event_delivery(self._impl))
3712
3715
3720
3722
3723 - def __init__(self, handler, on_error=None):
3726
3730
3732 if self.on_error is None:
3733 raise exc, val, tb
3734 else:
3735 self.on_error((exc, val, tb))
3736
3738
3739 @staticmethod
3740 - def wrap(impl, on_error=None):
3741 if impl is None:
3742 return None
3743 else:
3744 handler = WrappedHandler(impl)
3745 handler.__dict__["on_error"] = on_error
3746 return handler
3747
3748 - def __init__(self, impl_or_constructor):
3750
3752 on_error = getattr(self, "on_error", None)
3753 if on_error is None:
3754 raise info[0], info[1], info[2]
3755 else:
3756 on_error(info)
3757
3758 - def add(self, handler):
3759 if handler is None: return
3760 impl = _chandler(handler, self._on_error)
3761 pn_handler_add(self._impl, impl)
3762 pn_decref(impl)
3763
3765 pn_handler_clear(self._impl)
3766
3768 if obj is None:
3769 return None
3770 elif isinstance(obj, WrappedHandler):
3771 impl = obj._impl
3772 pn_incref(impl)
3773 return impl
3774 else:
3775 return pn_pyhandler(_cadapter(obj, on_error))
3776
3778 """
3779 Simple URL parser/constructor, handles URLs of the form:
3780
3781 <scheme>://<user>:<password>@<host>:<port>/<path>
3782
3783 All components can be None if not specifeid in the URL string.
3784
3785 The port can be specified as a service name, e.g. 'amqp' in the
3786 URL string but Url.port always gives the integer value.
3787
3788 @ivar scheme: Url scheme e.g. 'amqp' or 'amqps'
3789 @ivar user: Username
3790 @ivar password: Password
3791 @ivar host: Host name, ipv6 literal or ipv4 dotted quad.
3792 @ivar port: Integer port.
3793 @ivar host_port: Returns host:port
3794 """
3795
3796 AMQPS = "amqps"
3797 AMQP = "amqp"
3798
3800 """An integer port number that can be constructed from a service name string"""
3801
3803 """@param value: integer port number or string service name."""
3804 port = super(Url.Port, cls).__new__(cls, cls._port_int(value))
3805 setattr(port, 'name', str(value))
3806 return port
3807
3808 - def __eq__(self, x): return str(self) == x or int(self) == x
3809 - def __ne__(self, x): return not self == x
3811
3812 @staticmethod
3814 """Convert service, an integer or a service name, into an integer port number."""
3815 try:
3816 return int(value)
3817 except ValueError:
3818 try:
3819 return socket.getservbyname(value)
3820 except socket.error:
3821
3822 if value == Url.AMQPS: return 5671
3823 elif value == Url.AMQP: return 5672
3824 else:
3825 raise ValueError("Not a valid port number or service name: '%s'" % value)
3826
3827 - def __init__(self, url=None, defaults=True, **kwargs):
3828 """
3829 @param url: URL string to parse.
3830 @param defaults: If true, fill in missing default values in the URL.
3831 If false, you can fill them in later by calling self.defaults()
3832 @param kwargs: scheme, user, password, host, port, path.
3833 If specified, replaces corresponding part in url string.
3834 """
3835 if url:
3836 self._url = pn_url_parse(str(url))
3837 if not self._url: raise ValueError("Invalid URL '%s'" % url)
3838 else:
3839 self._url = pn_url()
3840 for k in kwargs:
3841 getattr(self, k)
3842 setattr(self, k, kwargs[k])
3843 if defaults: self.defaults()
3844
3847 self.getter = globals()["pn_url_get_%s" % part]
3848 self.setter = globals()["pn_url_set_%s" % part]
3849 - def __get__(self, obj, type=None): return self.getter(obj._url)
3850 - def __set__(self, obj, value): return self.setter(obj._url, str(value))
3851
3852 scheme = PartDescriptor('scheme')
3853 username = PartDescriptor('username')
3854 password = PartDescriptor('password')
3855 host = PartDescriptor('host')
3856 path = PartDescriptor('path')
3857
3859 portstr = pn_url_get_port(self._url)
3860 return portstr and Url.Port(portstr)
3861
3863 if value is None: pn_url_set_port(self._url, None)
3864 else: pn_url_set_port(self._url, str(Url.Port(value)))
3865
3866 port = property(_get_port, _set_port)
3867
3868 - def __str__(self): return pn_url_str(self._url)
3869
3870 - def __repr__(self): return "Url(%r)" % str(self)
3871
3872 - def __eq__(self, x): return str(self) == str(x)
3873 - def __ne__(self, x): return not self == x
3874
3876 pn_url_free(self._url);
3877 del self._url
3878
3880 """
3881 Fill in missing values (scheme, host or port) with defaults
3882 @return: self
3883 """
3884 self.scheme = self.scheme or self.AMQP
3885 self.host = self.host or '0.0.0.0'
3886 self.port = self.port or self.Port(self.scheme)
3887 return self
3888
3889 __all__ = [
3890 "API_LANGUAGE",
3891 "IMPLEMENTATION_LANGUAGE",
3892 "ABORTED",
3893 "ACCEPTED",
3894 "AUTOMATIC",
3895 "PENDING",
3896 "MANUAL",
3897 "REJECTED",
3898 "RELEASED",
3899 "MODIFIED",
3900 "SETTLED",
3901 "UNDESCRIBED",
3902 "Array",
3903 "Collector",
3904 "Condition",
3905 "Connection",
3906 "Data",
3907 "Delivery",
3908 "Disposition",
3909 "Described",
3910 "Endpoint",
3911 "Event",
3912 "Handler",
3913 "Link",
3914 "Message",
3915 "MessageException",
3916 "Messenger",
3917 "MessengerException",
3918 "ProtonException",
3919 "VERSION_MAJOR",
3920 "VERSION_MINOR",
3921 "Receiver",
3922 "SASL",
3923 "Sender",
3924 "Session",
3925 "SSL",
3926 "SSLDomain",
3927 "SSLSessionDetails",
3928 "SSLUnavailable",
3929 "SSLException",
3930 "Terminus",
3931 "Timeout",
3932 "Interrupt",
3933 "Transport",
3934 "TransportException",
3935 "Url",
3936 "char",
3937 "dispatch",
3938 "symbol",
3939 "timestamp",
3940 "ulong"
3941 ]
3942