Package proton :: Module utils
[frames] | no frames]

Source Code for Module proton.utils

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one 
  3  # or more contributor license agreements.  See the NOTICE file 
  4  # distributed with this work for additional information 
  5  # regarding copyright ownership.  The ASF licenses this file 
  6  # to you under the Apache License, Version 2.0 (the 
  7  # "License"); you may not use this file except in compliance 
  8  # with the License.  You may obtain a copy of the License at 
  9  # 
 10  #   http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, 
 13  # software distributed under the License is distributed on an 
 14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 15  # KIND, either express or implied.  See the License for the 
 16  # specific language governing permissions and limitations 
 17  # under the License. 
 18  # 
 19  import collections, socket, time, threading 
 20   
 21  from proton import ConnectionException, Delivery, Endpoint, Handler, Link, LinkException, Message 
 22  from proton import ProtonException, Timeout, Url 
 23  from proton.reactor import Container 
 24  from proton.handlers import MessagingHandler, IncomingMessageHandler 
 55   
56 -class SendException(ProtonException):
57 """ 58 Exception used to indicate an exceptional state/condition on a send request 59 """
60 - def __init__(self, state):
61 self.state = state
62
63 -def _is_settled(delivery):
64 return delivery.settled or delivery.link.snd_settle_mode == Link.SND_SETTLED
65
66 -class BlockingSender(BlockingLink):
67 - def __init__(self, connection, sender):
68 super(BlockingSender, self).__init__(connection, sender) 69 if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address: 70 #this may be followed by a detach, which may contain an error condition, so wait a little... 71 self._waitForClose() 72 #...but close ourselves if peer does not 73 self.link.close() 74 raise LinkException("Failed to open sender %s, target does not match" % self.link.name)
75
76 - def send(self, msg, timeout=False, error_states=None):
77 delivery = self.link.send(msg) 78 self.connection.wait(lambda: _is_settled(delivery), msg="Sending on sender %s" % self.link.name, timeout=timeout) 79 if delivery.link.snd_settle_mode != Link.SND_SETTLED: 80 delivery.settle() 81 bad = error_states 82 if bad is None: 83 bad = [Delivery.REJECTED, Delivery.RELEASED] 84 if delivery.remote_state in bad: 85 raise SendException(delivery.remote_state) 86 return delivery
87
88 -class Fetcher(MessagingHandler):
89 - def __init__(self, connection, prefetch):
90 super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False) 91 self.connection = connection 92 self.incoming = collections.deque([]) 93 self.unsettled = collections.deque([])
94
95 - def on_message(self, event):
96 self.incoming.append((event.message, event.delivery)) 97 self.connection.container.yield_() # Wake up the wait() loop to handle the message.
98 103
104 - def on_connection_error(self, event):
105 raise ConnectionClosed(event.connection)
106 107 @property
108 - def has_message(self):
109 return len(self.incoming)
110
111 - def pop(self):
112 message, delivery = self.incoming.popleft() 113 if not delivery.settled: 114 self.unsettled.append(delivery) 115 return message
116
117 - def settle(self, state=None):
118 delivery = self.unsettled.popleft() 119 if state: 120 delivery.update(state) 121 delivery.settle()
122
123 124 -class BlockingReceiver(BlockingLink):
125 - def __init__(self, connection, receiver, fetcher, credit=1):
126 super(BlockingReceiver, self).__init__(connection, receiver) 127 if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address: 128 #this may be followed by a detach, which may contain an error condition, so wait a little... 129 self._waitForClose() 130 #...but close ourselves if peer does not 131 self.link.close() 132 raise LinkException("Failed to open receiver %s, source does not match" % self.link.name) 133 if credit: receiver.flow(credit) 134 self.fetcher = fetcher
135
136 - def __del__(self):
137 self.fetcher = None 138 self.link.handler = None
139
140 - def receive(self, timeout=False):
141 if not self.fetcher: 142 raise Exception("Can't call receive on this receiver as a handler was provided") 143 if not self.link.credit: 144 self.link.flow(1) 145 self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name, timeout=timeout) 146 return self.fetcher.pop()
147
148 - def accept(self):
150
151 - def reject(self):
153
154 - def release(self, delivered=True):
155 if delivered: 156 self.settle(Delivery.MODIFIED) 157 else: 158 self.settle(Delivery.RELEASED)
159
160 - def settle(self, state=None):
161 if not self.fetcher: 162 raise Exception("Can't call accept/reject etc on this receiver as a handler was provided") 163 self.fetcher.settle(state)
164
165 166 -class LinkDetached(LinkException):
167 - def __init__(self, link):
168 self.link = link 169 if link.is_sender: 170 txt = "sender %s to %s closed" % (link.name, link.target.address) 171 else: 172 txt = "receiver %s from %s closed" % (link.name, link.source.address) 173 if link.remote_condition: 174 txt += " due to: %s" % link.remote_condition 175 self.condition = link.remote_condition.name 176 else: 177 txt += " by peer" 178 self.condition = None 179 super(LinkDetached, self).__init__(txt)
180
181 182 -class ConnectionClosed(ConnectionException):
183 - def __init__(self, connection):
184 self.connection = connection 185 txt = "Connection %s closed" % connection.hostname 186 if connection.remote_condition: 187 txt += " due to: %s" % connection.remote_condition 188 self.condition = connection.remote_condition.name 189 else: 190 txt += " by peer" 191 self.condition = None 192 super(ConnectionClosed, self).__init__(txt)
193
194 195 -class BlockingConnection(Handler):
196 """ 197 A synchronous style connection wrapper. 198 """
199 - def __init__(self, url, timeout=None, container=None, ssl_domain=None, heartbeat=None):
200 self.disconnected = False 201 self.timeout = timeout or 60 202 self.container = container or Container() 203 self.container.timeout = self.timeout 204 self.container.start() 205 self.url = Url(url).defaults() 206 self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, heartbeat=heartbeat) 207 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), 208 msg="Opening connection")
209
210 - def create_sender(self, address, handler=None, name=None, options=None):
211 return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler, options=options))
212
213 - def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):
214 prefetch = credit 215 if handler: 216 fetcher = None 217 if prefetch is None: 218 prefetch = 1 219 else: 220 fetcher = Fetcher(self, credit) 221 return BlockingReceiver( 222 self, self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher, options=options), fetcher, credit=prefetch)
223
224 - def close(self):
225 self.conn.close() 226 try: 227 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE), 228 msg="Closing connection") 229 finally: 230 self.conn = None 231 self.container = None
232
233 - def _is_closed(self):
234 return self.conn.state & (Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED)
235
236 - def run(self):
237 """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """ 238 while self.container.process(): pass
239
240 - def wait(self, condition, timeout=False, msg=None):
241 """Call process until condition() is true""" 242 if timeout is False: 243 timeout = self.timeout 244 if timeout is None: 245 while not condition() and not self.disconnected: 246 self.container.process() 247 else: 248 container_timeout = self.container.timeout 249 self.container.timeout = timeout 250 try: 251 deadline = time.time() + timeout 252 while not condition() and not self.disconnected: 253 self.container.process() 254 if deadline < time.time(): 255 txt = "Connection %s timed out" % self.url 256 if msg: txt += ": " + msg 257 raise Timeout(txt) 258 finally: 259 self.container.timeout = container_timeout 260 if self.disconnected or self._is_closed(): 261 self.container.stop() 262 self.conn.handler = None # break cyclical reference 263 if self.disconnected and not self._is_closed(): 264 raise ConnectionException("Connection %s disconnected" % self.url)
265 270
271 - def on_connection_remote_close(self, event):
272 if event.connection.state & Endpoint.LOCAL_ACTIVE: 273 event.connection.close() 274 raise ConnectionClosed(event.connection)
275
276 - def on_transport_tail_closed(self, event):
277 self.on_transport_closed(event)
278
279 - def on_transport_head_closed(self, event):
280 self.on_transport_closed(event)
281
282 - def on_transport_closed(self, event):
283 self.disconnected = True
284
285 -class AtomicCount(object):
286 - def __init__(self, start=0, step=1):
287 """Thread-safe atomic counter. Start at start, increment by step.""" 288 self.count, self.step = start, step 289 self.lock = threading.Lock()
290
291 - def next(self):
292 """Get the next value""" 293 self.lock.acquire() 294 self.count += self.step; 295 result = self.count 296 self.lock.release() 297 return result
298
299 -class SyncRequestResponse(IncomingMessageHandler):
300 """ 301 Implementation of the synchronous request-responce (aka RPC) pattern. 302 @ivar address: Address for all requests, may be None. 303 @ivar connection: Connection for requests and responses. 304 """ 305 306 correlation_id = AtomicCount() 307
308 - def __init__(self, connection, address=None):
309 """ 310 Send requests and receive responses. A single instance can send many requests 311 to the same or different addresses. 312 313 @param connection: A L{BlockingConnection} 314 @param address: Address for all requests. 315 If not specified, each request must have the address property set. 316 Sucessive messages may have different addresses. 317 """ 318 super(SyncRequestResponse, self).__init__() 319 self.connection = connection 320 self.address = address 321 self.sender = self.connection.create_sender(self.address) 322 # dynamic=true generates a unique address dynamically for this receiver. 323 # credit=1 because we want to receive 1 response message initially. 324 self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self) 325 self.response = None
326
327 - def call(self, request):
328 """ 329 Send a request message, wait for and return the response message. 330 331 @param request: A L{proton.Message}. If L{self.address} is not set the 332 L{self.address} must be set and will be used. 333 """ 334 if not self.address and not request.address: 335 raise ValueError("Request message has no address: %s" % request) 336 request.reply_to = self.reply_to 337 request.correlation_id = correlation_id = self.correlation_id.next() 338 self.sender.send(request) 339 def wakeup(): 340 return self.response and (self.response.correlation_id == correlation_id)
341 self.connection.wait(wakeup, msg="Waiting for response") 342 response = self.response 343 self.response = None # Ready for next response. 344 self.receiver.flow(1) # Set up credit for the next response. 345 return response
346 347 @property
348 - def reply_to(self):
349 """Return the dynamic address of our receiver.""" 350 return self.receiver.remote_source.address
351
352 - def on_message(self, event):
353 """Called when we receive a message for our receiver.""" 354 self.response = event.message 355 self.connection.container.yield_() # Wake up the wait() loop to handle the message.
356