001/* 002 * Copyright 2009-2014 UnboundID Corp. 003 * All Rights Reserved. 004 */ 005/* 006 * Copyright (C) 2009-2014 UnboundID Corp. 007 * 008 * This program is free software; you can redistribute it and/or modify 009 * it under the terms of the GNU General Public License (GPLv2 only) 010 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only) 011 * as published by the Free Software Foundation. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Public License for more details. 017 * 018 * You should have received a copy of the GNU General Public License 019 * along with this program; if not, see <http://www.gnu.org/licenses>. 020 */ 021package com.unboundid.ldap.sdk; 022 023 024 025import java.util.concurrent.LinkedBlockingQueue; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicBoolean; 028import java.util.concurrent.atomic.AtomicReference; 029 030import com.unboundid.util.InternalUseOnly; 031import com.unboundid.util.ThreadSafety; 032import com.unboundid.util.ThreadSafetyLevel; 033 034import static com.unboundid.ldap.sdk.LDAPMessages.*; 035import static com.unboundid.util.Debug.*; 036import static com.unboundid.util.Validator.*; 037 038 039 040/** 041 * This class provides an {@link EntrySource} that will read entries matching a 042 * given set of search criteria from an LDAP directory server. It may 043 * optionally close the associated connection after all entries have been read. 044 * <BR><BR> 045 * This implementation processes the search asynchronously, which provides two 046 * benefits: 047 * <UL> 048 * <LI>It makes it easier to provide a throttling mechanism to prevent the 049 * entries from piling up and causing the client to run out of memory if 050 * the server returns them faster than the client can process them. If 051 * this occurs, then the client will queue up a small number of entries 052 * but will then push back against the server to block it from sending 053 * additional entries until the client can catch up. In this case, no 054 * entries should be lost, although some servers may impose limits on how 055 * long a search may be active or other forms of constraints.</LI> 056 * <LI>It makes it possible to abandon the search if the entry source is no 057 * longer needed (as signified by calling the {@link #close} method) and 058 * the caller intends to stop iterating through the results.</LI> 059 * </UL> 060 * <H2>Example</H2> 061 * The following example demonstrates the process that may be used for iterating 062 * across all entries containing the {@code person} object class using the LDAP 063 * entry source API: 064 * <PRE> 065 * SearchRequest searchRequest = new SearchRequest("dc=example,dc=com", 066 * SearchScope.SUB, Filter.createEqualityFilter("objectClass", "person")); 067 * LDAPEntrySource entrySource = new LDAPEntrySource(connection, 068 * searchRequest, false); 069 * 070 * int entriesRead = 0; 071 * int referencesRead = 0; 072 * int exceptionsCaught = 0; 073 * try 074 * { 075 * while (true) 076 * { 077 * try 078 * { 079 * Entry entry = entrySource.nextEntry(); 080 * if (entry == null) 081 * { 082 * // There are no more entries to be read. 083 * break; 084 * } 085 * else 086 * { 087 * // Do something with the entry here. 088 * entriesRead++; 089 * } 090 * } 091 * catch (SearchResultReferenceEntrySourceException e) 092 * { 093 * // The directory server returned a search result reference. 094 * SearchResultReference searchReference = e.getSearchReference(); 095 * referencesRead++; 096 * } 097 * catch (EntrySourceException e) 098 * { 099 * // Some kind of problem was encountered (e.g., the connection is no 100 * // longer valid). See if we can continue reading entries. 101 * exceptionsCaught++; 102 * if (! e.mayContinueReading()) 103 * { 104 * break; 105 * } 106 * } 107 * } 108 * } 109 * finally 110 * { 111 * entrySource.close(); 112 * } 113 * </PRE> 114 */ 115@ThreadSafety(level=ThreadSafetyLevel.NOT_THREADSAFE) 116public final class LDAPEntrySource 117 extends EntrySource 118 implements AsyncSearchResultListener 119{ 120 /** 121 * The bogus entry that will be used to signify the end of the results. 122 */ 123 private static final String END_OF_RESULTS = "END OF RESULTS"; 124 125 126 127 /** 128 * The serial version UID for this serializable class. 129 */ 130 private static final long serialVersionUID = 1080386705549149135L; 131 132 133 134 // The request ID associated with the asynchronous search. 135 private final AsyncRequestID asyncRequestID; 136 137 // Indicates whether this entry source has been closed. 138 private final AtomicBoolean closed; 139 140 // The search result for the search operation. 141 private final AtomicReference<SearchResult> searchResult; 142 143 // Indicates whether to close the connection when this entry source is closed. 144 private final boolean closeConnection; 145 146 // The connection that will be used to read the entries. 147 private final LDAPConnection connection; 148 149 // The queue from which entries will be read. 150 private final LinkedBlockingQueue<Object> queue; 151 152 153 154 /** 155 * Creates a new LDAP entry source with the provided information. 156 * 157 * @param connection The connection to the directory server from which 158 * the entries will be read. It must not be 159 * {@code null}. 160 * @param searchRequest The search request that will be used to identify 161 * which entries should be returned. It must not be 162 * {@code null}, and it must not be configured with a 163 * {@link SearchResultListener}. 164 * @param closeConnection Indicates whether the provided connection should 165 * be closed whenever all of the entries have been 166 * read, or if the {@link #close} method is called. 167 * 168 * @throws LDAPException If there is a problem with the provided search 169 * request or when trying to communicate with the 170 * directory server over the provided connection. 171 */ 172 public LDAPEntrySource(final LDAPConnection connection, 173 final SearchRequest searchRequest, 174 final boolean closeConnection) 175 throws LDAPException 176 { 177 this(connection, searchRequest, closeConnection, 100); 178 } 179 180 181 182 /** 183 * Creates a new LDAP entry source with the provided information. 184 * 185 * @param connection The connection to the directory server from which 186 * the entries will be read. It must not be 187 * {@code null}. 188 * @param searchRequest The search request that will be used to identify 189 * which entries should be returned. It must not be 190 * {@code null}, and it must not be configured with a 191 * {@link SearchResultListener}. 192 * @param closeConnection Indicates whether the provided connection should 193 * be closed whenever all of the entries have been 194 * read, or if the {@link #close} method is called. 195 * @param queueSize The size of the internal queue used to hold search 196 * result entries until they can be consumed by the 197 * {@link #nextEntry} method. The value must be 198 * greater than zero. 199 * 200 * @throws LDAPException If there is a problem with the provided search 201 * request or when trying to communicate with the 202 * directory server over the provided connection. 203 */ 204 public LDAPEntrySource(final LDAPConnection connection, 205 final SearchRequest searchRequest, 206 final boolean closeConnection, 207 final int queueSize) 208 throws LDAPException 209 { 210 ensureNotNull(connection, searchRequest); 211 ensureTrue(queueSize > 0, 212 "LDAPEntrySource.queueSize must be greater than 0."); 213 214 this.connection = connection; 215 this.closeConnection = closeConnection; 216 217 if (searchRequest.getSearchResultListener() != null) 218 { 219 throw new LDAPException(ResultCode.PARAM_ERROR, 220 ERR_LDAP_ENTRY_SOURCE_REQUEST_HAS_LISTENER.get()); 221 } 222 223 closed = new AtomicBoolean(false); 224 searchResult = new AtomicReference<SearchResult>(); 225 queue = new LinkedBlockingQueue<Object>(queueSize); 226 227 final SearchRequest r = new SearchRequest(this, searchRequest.getControls(), 228 searchRequest.getBaseDN(), searchRequest.getScope(), 229 searchRequest.getDereferencePolicy(), searchRequest.getSizeLimit(), 230 searchRequest.getTimeLimitSeconds(), searchRequest.typesOnly(), 231 searchRequest.getFilter(), searchRequest.getAttributes()); 232 asyncRequestID = connection.asyncSearch(r); 233 } 234 235 236 237 /** 238 * {@inheritDoc} 239 */ 240 @Override() 241 public Entry nextEntry() 242 throws EntrySourceException 243 { 244 while (true) 245 { 246 if (closed.get() && queue.isEmpty()) 247 { 248 return null; 249 } 250 251 final Object o; 252 try 253 { 254 o = queue.poll(10L, TimeUnit.MILLISECONDS); 255 } 256 catch (InterruptedException ie) 257 { 258 debugException(ie); 259 continue; 260 } 261 262 if (o != null) 263 { 264 if (o == END_OF_RESULTS) 265 { 266 return null; 267 } 268 else if (o instanceof Entry) 269 { 270 return (Entry) o; 271 } 272 else 273 { 274 throw (EntrySourceException) o; 275 } 276 } 277 } 278 } 279 280 281 282 /** 283 * {@inheritDoc} 284 */ 285 @Override() 286 public void close() 287 { 288 closeInternal(true); 289 } 290 291 292 293 /** 294 * Closes this LDAP entry source. 295 * 296 * @param abandon Indicates whether to attempt to abandon the search. 297 */ 298 private void closeInternal(final boolean abandon) 299 { 300 addToQueue(END_OF_RESULTS); 301 302 if (closed.compareAndSet(false, true)) 303 { 304 if (abandon) 305 { 306 try 307 { 308 connection.abandon(asyncRequestID); 309 } 310 catch (Exception e) 311 { 312 debugException(e); 313 } 314 } 315 316 if (closeConnection) 317 { 318 connection.close(); 319 } 320 } 321 } 322 323 324 325 /** 326 * Retrieves the search result for the search operation, if available. It 327 * will not be available until the search has completed (as indicated by a 328 * {@code null} return value from the {@link #nextEntry} method). 329 * 330 * @return The search result for the search operation, or {@code null} if it 331 * is not available (e.g., because the search has not yet completed). 332 */ 333 public SearchResult getSearchResult() 334 { 335 return searchResult.get(); 336 } 337 338 339 340 /** 341 * {@inheritDoc} This is intended for internal use only and should not be 342 * called by anything outside of the LDAP SDK itself. 343 */ 344 @InternalUseOnly() 345 public void searchEntryReturned(final SearchResultEntry searchEntry) 346 { 347 addToQueue(searchEntry); 348 } 349 350 351 352 /** 353 * {@inheritDoc} This is intended for internal use only and should not be 354 * called by anything outside of the LDAP SDK itself. 355 */ 356 @InternalUseOnly() 357 public void searchReferenceReturned( 358 final SearchResultReference searchReference) 359 { 360 addToQueue(new SearchResultReferenceEntrySourceException(searchReference)); 361 } 362 363 364 365 /** 366 * {@inheritDoc} This is intended for internal use only and should not be 367 * called by anything outside of the LDAP SDK itself. 368 */ 369 @InternalUseOnly() 370 public void searchResultReceived(final AsyncRequestID requestID, 371 final SearchResult searchResult) 372 { 373 this.searchResult.set(searchResult); 374 375 if (! searchResult.getResultCode().equals(ResultCode.SUCCESS)) 376 { 377 addToQueue(new EntrySourceException(false, 378 new LDAPSearchException(searchResult))); 379 } 380 381 closeInternal(false); 382 } 383 384 385 386 /** 387 * Adds the provided object to the queue, waiting as long as needed until it 388 * has been added. 389 * 390 * @param o The object to be added. It must not be {@code null}. 391 */ 392 private void addToQueue(final Object o) 393 { 394 while (true) 395 { 396 if (closed.get()) 397 { 398 return; 399 } 400 401 try 402 { 403 if (queue.offer(o, 100L, TimeUnit.MILLISECONDS)) 404 { 405 return; 406 } 407 } 408 catch (InterruptedException ie) 409 { 410 debugException(ie); 411 } 412 } 413 } 414}