Home | Trees | Indices | Help |
---|
|
1 # 2 # Licensed to the Apache Software Foundation (ASF) under one 3 # or more contributor license agreements. See the NOTICE file 4 # distributed with this work for additional information 5 # regarding copyright ownership. The ASF licenses this file 6 # to you under the Apache License, Version 2.0 (the 7 # "License"); you may not use this file except in compliance 8 # with the License. You may obtain a copy of the License at 9 # 10 # http://www.apache.org/licenses/LICENSE-2.0 11 # 12 # Unless required by applicable law or agreed to in writing, 13 # software distributed under the License is distributed on an 14 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 # KIND, either express or implied. See the License for the 16 # specific language governing permissions and limitations 17 # under the License. 18 # 19 import collections, Queue, socket, time, threading 20 from proton import ConnectionException, Delivery, Endpoint, Handler, LinkException, Message 21 from proton import ProtonException, Timeout, Url 22 from proton.reactor import Container 23 from proton.handlers import MessagingHandler, IncomingMessageHandler 3057 6433 self.connection = connection 34 self.link = link 35 self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_UNINIT), 36 msg="Opening link %s" % link.name) 37 self._checkClosed()3840 self.connection.wait(lambda: self.link.state & Endpoint.REMOTE_CLOSED, 41 timeout=timeout, 42 msg="Opening link %s" % self.link.name) 43 self._checkClosed()4446 if self.link.state & Endpoint.REMOTE_CLOSED: 47 self.link.close() 48 raise LinkDetached(self.link)4951 self.link.close() 52 self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_ACTIVE), 53 msg="Closing link %s" % self.link.name)54 55 # Access to other link attributes.8467 super(BlockingSender, self).__init__(connection, sender) 68 if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address: 69 #this may be followed by a detach, which may contain an error condition, so wait a little... 70 self._waitForClose() 71 #...but close ourselves if peer does not 72 self.link.close() 73 raise LinkException("Failed to open sender %s, target does not match" % self.link.name)7476 delivery = self.link.send(msg) 77 self.connection.wait(lambda: delivery.settled, msg="Sending on sender %s" % self.link.name, timeout=timeout) 78 bad = error_states 79 if bad is None: 80 bad = [Delivery.REJECTED, Delivery.RELEASED] 81 if delivery.remote_state in bad: 82 raise SendException(delivery.remote_state) 83 return delivery11987 super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False) 88 self.connection = connection 89 self.incoming = collections.deque([]) 90 self.unsettled = collections.deque([])9193 self.incoming.append((event.message, event.delivery)) 94 self.connection.container.yield_() # Wake up the wait() loop to handle the message.9597 if event.link.state & Endpoint.LOCAL_ACTIVE: 98 event.link.close() 99 raise LinkDetached(event.link)100 103 104 @property106 return len(self.incoming)107109 message, delivery = self.incoming.popleft() 110 if not delivery.settled: 111 self.unsettled.append(delivery) 112 return message113157123 super(BlockingReceiver, self).__init__(connection, receiver) 124 if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address: 125 #this may be followed by a detach, which may contain an error condition, so wait a little... 126 self._waitForClose() 127 #...but close ourselves if peer does not 128 self.link.close() 129 raise LinkException("Failed to open receiver %s, source does not match" % self.link.name) 130 if credit: receiver.flow(credit) 131 self.fetcher = fetcher132134 if not self.fetcher: 135 raise Exception("Can't call receive on this receiver as a handler was provided") 136 if not self.link.credit: 137 self.link.flow(1) 138 self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name, timeout=timeout) 139 return self.fetcher.pop()140 143 146 152173161 self.link = link 162 if link.is_sender: 163 txt = "sender %s to %s closed" % (link.name, link.target.address) 164 else: 165 txt = "receiver %s from %s closed" % (link.name, link.source.address) 166 if link.remote_condition: 167 txt += " due to: %s" % link.remote_condition 168 self.condition = link.remote_condition.name 169 else: 170 txt += " by peer" 171 self.condition = None 172 super(LinkDetached, self).__init__(txt)186177 self.connection = connection 178 txt = "Connection %s closed" % self.url 179 if event.connection.remote_condition: 180 txt += " due to: %s" % event.connection.remote_condition 181 self.condition = connection.remote_condition.name 182 else: 183 txt += " by peer" 184 self.condition = None 185 super(ConnectionClosed, self).__init__(txt)189 """ 190 A synchronous style connection wrapper. 191 """262193 self.timeout = timeout 194 self.container = container or Container() 195 self.container.timeout = self.timeout 196 self.container.start() 197 self.url = Url(utf8(url)).defaults() 198 self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False) 199 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), 200 msg="Opening connection")201203 return BlockingSender(self, self.container.create_sender(self.conn, utf8(address), name=utf8(name), handler=handler, options=options))204205 - def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):206 prefetch = credit 207 if handler: 208 fetcher = None 209 if prefetch is None: 210 prefetch = 1 211 else: 212 fetcher = Fetcher(self, credit) 213 return BlockingReceiver( 214 self, self.container.create_receiver(self.conn, utf8(address), name=utf8(name), dynamic=dynamic, handler=handler or fetcher, options=options), fetcher, credit=prefetch)215217 self.conn.close() 218 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE), 219 msg="Closing connection")220222 """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """ 223 while self.container.process(): pass224226 """Call process until condition() is true""" 227 if timeout is False: 228 timeout = self.timeout 229 if timeout is None: 230 while not condition(): 231 self.container.process() 232 else: 233 container_timeout = self.container.timeout 234 self.container.timeout = timeout 235 try: 236 deadline = time.time() + timeout 237 while not condition(): 238 self.container.process() 239 if deadline < time.time(): 240 txt = "Connection %s timed out" % self.url 241 if msg: txt += ": " + msg 242 raise Timeout(txt) 243 finally: 244 self.container.timeout = container_timeout245247 if event.link.state & Endpoint.LOCAL_ACTIVE: 248 event.link.close() 249 raise LinkDetached(event.link)250252 if event.connection.state & Endpoint.LOCAL_ACTIVE: 253 event.connection.close() 254 raise ConnectionClosed(event.connection)255257 self.on_transport_closed(event)258260 if event.connection.state & Endpoint.LOCAL_ACTIVE: 261 raise ConnectionException("Connection %s disconnected" % self.url);276265 """Thread-safe atomic counter. Start at start, increment by step.""" 266 self.count, self.step = start, step 267 self.lock = threading.Lock()268270 """Get the next value""" 271 self.lock.acquire() 272 self.count += self.step; 273 result = self.count 274 self.lock.release() 275 return result278 """ 279 Implementation of the synchronous request-responce (aka RPC) pattern. 280 @ivar address: Address for all requests, may be None. 281 @ivar connection: Connection for requests and responses. 282 """ 283 284 correlation_id = AtomicCount() 285324 325 @property287 """ 288 Send requests and receive responses. A single instance can send many requests 289 to the same or different addresses. 290 291 @param connection: A L{BlockingConnection} 292 @param address: Address for all requests. 293 If not specified, each request must have the address property set. 294 Sucessive messages may have different addresses. 295 """ 296 super(SyncRequestResponse, self).__init__() 297 self.connection = connection 298 self.address = address 299 self.sender = self.connection.create_sender(self.address) 300 # dynamic=true generates a unique address dynamically for this receiver. 301 # credit=1 because we want to receive 1 response message initially. 302 self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self) 303 self.response = None304306 """ 307 Send a request message, wait for and return the response message. 308 309 @param request: A L{proton.Message}. If L{self.address} is not set the 310 L{self.address} must be set and will be used. 311 """ 312 if not self.address and not request.address: 313 raise ValueError("Request message has no address: %s" % request) 314 request.reply_to = self.reply_to 315 request.correlation_id = correlation_id = self.correlation_id.next() 316 self.sender.send(request) 317 def wakeup(): 318 return self.response and (self.response.correlation_id == correlation_id)319 self.connection.wait(wakeup, msg="Waiting for response") 320 response = self.response 321 self.response = None # Ready for next response. 322 self.receiver.flow(1) # Set up credit for the next response. 323 return response327 """Return the dynamic address of our receiver.""" 328 return self.receiver.remote_source.address329331 """Called when we receive a message for our receiver.""" 332 self.response = event.message 333 self.connection.container.yield_() # Wake up the wait() loop to handle the message.334
Home | Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Thu Apr 9 15:19:28 2015 | http://epydoc.sourceforge.net |