Package proton :: Module utils
[frames] | no frames]

Source Code for Module proton.utils

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one 
  3  # or more contributor license agreements.  See the NOTICE file 
  4  # distributed with this work for additional information 
  5  # regarding copyright ownership.  The ASF licenses this file 
  6  # to you under the Apache License, Version 2.0 (the 
  7  # "License"); you may not use this file except in compliance 
  8  # with the License.  You may obtain a copy of the License at 
  9  # 
 10  #   http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, 
 13  # software distributed under the License is distributed on an 
 14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 15  # KIND, either express or implied.  See the License for the 
 16  # specific language governing permissions and limitations 
 17  # under the License. 
 18  # 
 19  import collections, Queue, socket, time, threading 
 20  from proton import ConnectionException, Delivery, Endpoint, Handler, LinkException, Message 
 21  from proton import ProtonException, Timeout, Url 
 22  from proton.reactor import Container 
 23  from proton.handlers import MessagingHandler, IncomingMessageHandler 
24 25 -def utf8(s):
26 if isinstance(s, unicode): 27 return s.encode('utf8') 28 else: 29 return s
30 57
58 -class SendException(ProtonException):
59 """ 60 Exception used to indicate an exceptional state/condition on a send request 61 """
62 - def __init__(self, state):
63 self.state = state
64
65 -class BlockingSender(BlockingLink):
66 - def __init__(self, connection, sender):
67 super(BlockingSender, self).__init__(connection, sender) 68 if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address: 69 #this may be followed by a detach, which may contain an error condition, so wait a little... 70 self._waitForClose() 71 #...but close ourselves if peer does not 72 self.link.close() 73 raise LinkException("Failed to open sender %s, target does not match" % self.link.name)
74
75 - def send(self, msg, timeout=False, error_states=None):
76 delivery = self.link.send(msg) 77 self.connection.wait(lambda: delivery.settled, msg="Sending on sender %s" % self.link.name, timeout=timeout) 78 bad = error_states 79 if bad is None: 80 bad = [Delivery.REJECTED, Delivery.RELEASED] 81 if delivery.remote_state in bad: 82 raise SendException(delivery.remote_state) 83 return delivery
84
85 -class Fetcher(MessagingHandler):
86 - def __init__(self, connection, prefetch):
87 super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False) 88 self.connection = connection 89 self.incoming = collections.deque([]) 90 self.unsettled = collections.deque([])
91
92 - def on_message(self, event):
93 self.incoming.append((event.message, event.delivery)) 94 self.connection.container.yield_() # Wake up the wait() loop to handle the message.
95 100
101 - def on_connection_error(self, event):
102 raise ConnectionClosed(event.connection)
103 104 @property
105 - def has_message(self):
106 return len(self.incoming)
107
108 - def pop(self):
109 message, delivery = self.incoming.popleft() 110 if not delivery.settled: 111 self.unsettled.append(delivery) 112 return message
113
114 - def settle(self, state=None):
115 delivery = self.unsettled.popleft() 116 if state: 117 delivery.update(state) 118 delivery.settle()
119
120 121 -class BlockingReceiver(BlockingLink):
122 - def __init__(self, connection, receiver, fetcher, credit=1):
123 super(BlockingReceiver, self).__init__(connection, receiver) 124 if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address: 125 #this may be followed by a detach, which may contain an error condition, so wait a little... 126 self._waitForClose() 127 #...but close ourselves if peer does not 128 self.link.close() 129 raise LinkException("Failed to open receiver %s, source does not match" % self.link.name) 130 if credit: receiver.flow(credit) 131 self.fetcher = fetcher
132
133 - def receive(self, timeout=False):
134 if not self.fetcher: 135 raise Exception("Can't call receive on this receiver as a handler was provided") 136 if not self.link.credit: 137 self.link.flow(1) 138 self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name, timeout=timeout) 139 return self.fetcher.pop()
140
141 - def accept(self):
143
144 - def reject(self):
146
147 - def release(self, delivered=True):
148 if delivered: 149 self.settle(Delivery.MODIFIED) 150 else: 151 self.settle(Delivery.RELEASED)
152
153 - def settle(self, state=None):
154 if not self.fetcher: 155 raise Exception("Can't call accept/reject etc on this receiver as a handler was provided") 156 self.fetcher.settle(state)
157
158 159 -class LinkDetached(LinkException):
160 - def __init__(self, link):
161 self.link = link 162 if link.is_sender: 163 txt = "sender %s to %s closed" % (link.name, link.target.address) 164 else: 165 txt = "receiver %s from %s closed" % (link.name, link.source.address) 166 if link.remote_condition: 167 txt += " due to: %s" % link.remote_condition 168 self.condition = link.remote_condition.name 169 else: 170 txt += " by peer" 171 self.condition = None 172 super(LinkDetached, self).__init__(txt)
173
174 175 -class ConnectionClosed(ConnectionException):
176 - def __init__(self, connection):
177 self.connection = connection 178 txt = "Connection %s closed" % self.url 179 if event.connection.remote_condition: 180 txt += " due to: %s" % event.connection.remote_condition 181 self.condition = connection.remote_condition.name 182 else: 183 txt += " by peer" 184 self.condition = None 185 super(ConnectionClosed, self).__init__(txt)
186
187 188 -class BlockingConnection(Handler):
189 """ 190 A synchronous style connection wrapper. 191 """
192 - def __init__(self, url, timeout=None, container=None, ssl_domain=None):
193 self.timeout = timeout 194 self.container = container or Container() 195 self.container.timeout = self.timeout 196 self.container.start() 197 self.url = Url(utf8(url)).defaults() 198 self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False) 199 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), 200 msg="Opening connection")
201
202 - def create_sender(self, address, handler=None, name=None, options=None):
203 return BlockingSender(self, self.container.create_sender(self.conn, utf8(address), name=utf8(name), handler=handler, options=options))
204
205 - def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):
206 prefetch = credit 207 if handler: 208 fetcher = None 209 if prefetch is None: 210 prefetch = 1 211 else: 212 fetcher = Fetcher(self, credit) 213 return BlockingReceiver( 214 self, self.container.create_receiver(self.conn, utf8(address), name=utf8(name), dynamic=dynamic, handler=handler or fetcher, options=options), fetcher, credit=prefetch)
215
216 - def close(self):
217 self.conn.close() 218 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE), 219 msg="Closing connection")
220
221 - def run(self):
222 """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """ 223 while self.container.process(): pass
224
225 - def wait(self, condition, timeout=False, msg=None):
226 """Call process until condition() is true""" 227 if timeout is False: 228 timeout = self.timeout 229 if timeout is None: 230 while not condition(): 231 self.container.process() 232 else: 233 container_timeout = self.container.timeout 234 self.container.timeout = timeout 235 try: 236 deadline = time.time() + timeout 237 while not condition(): 238 self.container.process() 239 if deadline < time.time(): 240 txt = "Connection %s timed out" % self.url 241 if msg: txt += ": " + msg 242 raise Timeout(txt) 243 finally: 244 self.container.timeout = container_timeout
245 250
251 - def on_connection_remote_close(self, event):
252 if event.connection.state & Endpoint.LOCAL_ACTIVE: 253 event.connection.close() 254 raise ConnectionClosed(event.connection)
255
256 - def on_transport_tail_closed(self, event):
257 self.on_transport_closed(event)
258
259 - def on_transport_closed(self, event):
260 if event.connection.state & Endpoint.LOCAL_ACTIVE: 261 raise ConnectionException("Connection %s disconnected" % self.url);
262
263 -class AtomicCount(object):
264 - def __init__(self, start=0, step=1):
265 """Thread-safe atomic counter. Start at start, increment by step.""" 266 self.count, self.step = start, step 267 self.lock = threading.Lock()
268
269 - def next(self):
270 """Get the next value""" 271 self.lock.acquire() 272 self.count += self.step; 273 result = self.count 274 self.lock.release() 275 return result
276
277 -class SyncRequestResponse(IncomingMessageHandler):
278 """ 279 Implementation of the synchronous request-responce (aka RPC) pattern. 280 @ivar address: Address for all requests, may be None. 281 @ivar connection: Connection for requests and responses. 282 """ 283 284 correlation_id = AtomicCount() 285
286 - def __init__(self, connection, address=None):
287 """ 288 Send requests and receive responses. A single instance can send many requests 289 to the same or different addresses. 290 291 @param connection: A L{BlockingConnection} 292 @param address: Address for all requests. 293 If not specified, each request must have the address property set. 294 Sucessive messages may have different addresses. 295 """ 296 super(SyncRequestResponse, self).__init__() 297 self.connection = connection 298 self.address = address 299 self.sender = self.connection.create_sender(self.address) 300 # dynamic=true generates a unique address dynamically for this receiver. 301 # credit=1 because we want to receive 1 response message initially. 302 self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self) 303 self.response = None
304
305 - def call(self, request):
306 """ 307 Send a request message, wait for and return the response message. 308 309 @param request: A L{proton.Message}. If L{self.address} is not set the 310 L{self.address} must be set and will be used. 311 """ 312 if not self.address and not request.address: 313 raise ValueError("Request message has no address: %s" % request) 314 request.reply_to = self.reply_to 315 request.correlation_id = correlation_id = self.correlation_id.next() 316 self.sender.send(request) 317 def wakeup(): 318 return self.response and (self.response.correlation_id == correlation_id)
319 self.connection.wait(wakeup, msg="Waiting for response") 320 response = self.response 321 self.response = None # Ready for next response. 322 self.receiver.flow(1) # Set up credit for the next response. 323 return response
324 325 @property
326 - def reply_to(self):
327 """Return the dynamic address of our receiver.""" 328 return self.receiver.remote_source.address
329
330 - def on_message(self, event):
331 """Called when we receive a message for our receiver.""" 332 self.response = event.message 333 self.connection.container.yield_() # Wake up the wait() loop to handle the message.
334