001/* 002 * Copyright 2011-2014 UnboundID Corp. 003 * All Rights Reserved. 004 */ 005/* 006 * Copyright (C) 2011-2014 UnboundID Corp. 007 * 008 * This program is free software; you can redistribute it and/or modify 009 * it under the terms of the GNU General Public License (GPLv2 only) 010 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only) 011 * as published by the Free Software Foundation. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Public License for more details. 017 * 018 * You should have received a copy of the GNU General Public License 019 * along with this program; if not, see <http://www.gnu.org/licenses>. 020 */ 021package com.unboundid.util; 022 023 024 025import java.io.File; 026import java.io.FileInputStream; 027import java.io.InputStream; 028import java.io.IOException; 029import java.util.ArrayList; 030import java.util.Collection; 031import java.util.Iterator; 032 033import static com.unboundid.util.UtilityMessages.*; 034 035 036 037/** 038 * This class provides an input stream implementation that can aggregate 039 * multiple input streams. When reading data from this input stream, it will 040 * read from the first input stream until the end of it is reached, at point it 041 * will close it and start reading from the next one, and so on until all input 042 * streams have been exhausted. Closing the aggregate input stream will cause 043 * all remaining input streams to be closed. 044 */ 045@ThreadSafety(level=ThreadSafetyLevel.NOT_THREADSAFE) 046public final class AggregateInputStream 047 extends InputStream 048{ 049 // The currently-active input stream. 050 private volatile InputStream activeInputStream; 051 052 // The iterator that will be used to access the input streams. 053 private final Iterator<InputStream> streamIterator; 054 055 056 057 /** 058 * Creates a new aggregate input stream that will use the provided set of 059 * input streams. 060 * 061 * @param inputStreams The input streams to be used by this aggregate input 062 * stream. It must not be {@code null}. 063 */ 064 public AggregateInputStream(final InputStream... inputStreams) 065 { 066 this(StaticUtils.toList(inputStreams)); 067 } 068 069 070 071 /** 072 * Creates a new aggregate input stream that will use the provided set of 073 * input streams. 074 * 075 * @param inputStreams The input streams to be used by this aggregate input 076 * stream. It must not be {@code null}. 077 */ 078 public AggregateInputStream( 079 final Collection<? extends InputStream> inputStreams) 080 { 081 Validator.ensureNotNull(inputStreams); 082 083 final ArrayList<InputStream> streamList = 084 new ArrayList<InputStream>(inputStreams); 085 streamIterator = streamList.iterator(); 086 activeInputStream = null; 087 } 088 089 090 091 /** 092 * Creates a new aggregate input stream that will read data from the specified 093 * files. 094 * 095 * @param files The set of files to be read by this aggregate input stream. 096 * It must not be {@code null}. 097 * 098 * @throws IOException If a problem is encountered while attempting to 099 * create input streams for the provided files. 100 */ 101 public AggregateInputStream(final File... files) 102 throws IOException 103 { 104 Validator.ensureNotNull(files); 105 106 final ArrayList<InputStream> streamList = 107 new ArrayList<InputStream>(files.length); 108 109 IOException ioException = null; 110 for (final File f : files) 111 { 112 try 113 { 114 streamList.add(new FileInputStream(f)); 115 } 116 catch (final IOException ioe) 117 { 118 Debug.debugException(ioe); 119 ioException = ioe; 120 break; 121 } 122 } 123 124 if (ioException != null) 125 { 126 for (final InputStream s : streamList) 127 { 128 if (s != null) 129 { 130 try 131 { 132 s.close(); 133 } 134 catch (final Exception e) 135 { 136 Debug.debugException(e); 137 } 138 } 139 } 140 141 throw ioException; 142 } 143 144 streamIterator = streamList.iterator(); 145 activeInputStream = null; 146 } 147 148 149 150 /** 151 * Reads the next byte of data from the current active input stream, switching 152 * to the next input stream in the set if appropriate. 153 * 154 * @return The next byte of data that was read, or -1 if all streams have 155 * been exhausted. 156 * 157 * @throws IOException If a problem is encountered while attempting to read 158 * data from an input stream. 159 */ 160 @Override() 161 public int read() 162 throws IOException 163 { 164 while (true) 165 { 166 if (activeInputStream == null) 167 { 168 if (streamIterator.hasNext()) 169 { 170 activeInputStream = streamIterator.next(); 171 continue; 172 } 173 else 174 { 175 return -1; 176 } 177 } 178 179 final int byteRead = activeInputStream.read(); 180 if (byteRead < 0) 181 { 182 activeInputStream.close(); 183 activeInputStream = null; 184 } 185 else 186 { 187 return byteRead; 188 } 189 } 190 } 191 192 193 194 /** 195 * Reads data from the current active input stream into the provided array, 196 * switching to the next input stream in the set if appropriate. 197 * 198 * @param b The array into which the data read should be placed, starting 199 * with an index of zero. It must not be {@code null}. 200 * 201 * @return The number of bytes read into the array, or -1 if all streams have 202 * been exhausted. 203 * 204 * @throws IOException If a problem is encountered while attempting to read 205 * data from an input stream. 206 */ 207 @Override() 208 public int read(final byte[] b) 209 throws IOException 210 { 211 return read(b, 0, b.length); 212 } 213 214 215 216 /** 217 * Reads data from the current active input stream into the provided array, 218 * switching to the next input stream in the set if appropriate. 219 * 220 * @param b The array into which the data read should be placed. It must 221 * not be {@code null}. 222 * @param off The position in the array at which to start writing data. 223 * @param len The maximum number of bytes that may be read. 224 * 225 * @return The number of bytes read into the array, or -1 if all streams have 226 * been exhausted. 227 * 228 * @throws IOException If a problem is encountered while attempting to read 229 * data from an input stream. 230 */ 231 @Override() 232 public int read(final byte[] b, final int off, final int len) 233 throws IOException 234 { 235 while (true) 236 { 237 if (activeInputStream == null) 238 { 239 if (streamIterator.hasNext()) 240 { 241 activeInputStream = streamIterator.next(); 242 continue; 243 } 244 else 245 { 246 return -1; 247 } 248 } 249 250 final int bytesRead = activeInputStream.read(b, off, len); 251 if (bytesRead < 0) 252 { 253 activeInputStream.close(); 254 activeInputStream = null; 255 } 256 else 257 { 258 return bytesRead; 259 } 260 } 261 } 262 263 264 265 /** 266 * Attempts to skip and discard up to the specified number of bytes from the 267 * input stream. 268 * 269 * @param n The number of bytes to attempt to skip. 270 * 271 * @return The number of bytes actually skipped. 272 * 273 * @throws IOException If a problem is encountered while attempting to skip 274 * data from the input stream. 275 */ 276 @Override() 277 public long skip(final long n) 278 throws IOException 279 { 280 if (activeInputStream == null) 281 { 282 if (streamIterator.hasNext()) 283 { 284 activeInputStream = streamIterator.next(); 285 return activeInputStream.skip(n); 286 } 287 else 288 { 289 return 0L; 290 } 291 } 292 else 293 { 294 return activeInputStream.skip(n); 295 } 296 } 297 298 299 300 /** 301 * Retrieves an estimate of the number of bytes that can be read without 302 * blocking. 303 * 304 * @return An estimate of the number of bytes that can be read without 305 * blocking. 306 * 307 * @throws IOException If a problem is encountered while attempting to make 308 * the determination. 309 */ 310 @Override() 311 public int available() 312 throws IOException 313 { 314 if (activeInputStream == null) 315 { 316 if (streamIterator.hasNext()) 317 { 318 activeInputStream = streamIterator.next(); 319 return activeInputStream.available(); 320 } 321 else 322 { 323 return 0; 324 } 325 } 326 else 327 { 328 return activeInputStream.available(); 329 } 330 } 331 332 333 334 /** 335 * Indicates whether this input stream supports the use of the {@code mark} 336 * and {@code reset} methods. This implementation does not support that 337 * capability. 338 * 339 * @return {@code false} to indicate that this input stream implementation 340 * does not support the use of {@code mark} and {@code reset}. 341 */ 342 @Override() 343 public boolean markSupported() 344 { 345 return false; 346 } 347 348 349 350 /** 351 * Marks the current position in the input stream. This input stream does not 352 * support this functionality, so no action will be taken. 353 * 354 * @param readLimit The maximum number of bytes that the caller may wish to 355 * read before being able to reset the stream. 356 */ 357 @Override() 358 public void mark(final int readLimit) 359 { 360 // No implementation is required. 361 } 362 363 364 365 /** 366 * Attempts to reset the position of this input stream to the mark location. 367 * This implementation does not support {@code mark} and {@code reset} 368 * functionality, so this method will always throw an exception. 369 * 370 * @throws IOException To indicate that reset is not supported. 371 */ 372 @Override() 373 public void reset() 374 throws IOException 375 { 376 throw new IOException(ERR_AGGREGATE_INPUT_STREAM_MARK_NOT_SUPPORTED.get()); 377 } 378 379 380 381 /** 382 * Closes this input stream. All associated input streams will be closed. 383 * 384 * @throws IOException If an exception was encountered while attempting to 385 * close any of the associated streams. Note that even 386 * if an exception is encountered, an attempt will be 387 * made to close all streams. 388 */ 389 @Override() 390 public void close() 391 throws IOException 392 { 393 IOException firstException = null; 394 395 if (activeInputStream != null) 396 { 397 try 398 { 399 activeInputStream.close(); 400 } 401 catch (final IOException ioe) 402 { 403 Debug.debugException(ioe); 404 firstException = ioe; 405 } 406 activeInputStream = null; 407 } 408 409 while (streamIterator.hasNext()) 410 { 411 final InputStream s = streamIterator.next(); 412 try 413 { 414 s.close(); 415 } 416 catch (final IOException ioe) 417 { 418 Debug.debugException(ioe); 419 if (firstException == null) 420 { 421 firstException = ioe; 422 } 423 } 424 } 425 426 if (firstException != null) 427 { 428 throw firstException; 429 } 430 } 431}