001/*
002 * Copyright 2014 UnboundID Corp.
003 * All Rights Reserved.
004 */
005/*
006 * Copyright (C) 2014 UnboundID Corp.
007 *
008 * This program is free software; you can redistribute it and/or modify
009 * it under the terms of the GNU General Public License (GPLv2 only)
010 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only)
011 * as published by the Free Software Foundation.
012 *
013 * This program is distributed in the hope that it will be useful,
014 * but WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
016 * GNU General Public License for more details.
017 *
018 * You should have received a copy of the GNU General Public License
019 * along with this program; if not, see <http://www.gnu.org/licenses>.
020 */
021package com.unboundid.util;
022
023
024
025import java.io.BufferedReader;
026import java.io.File;
027import java.io.FileReader;
028import java.io.IOException;
029import java.io.PrintWriter;
030import java.io.Reader;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.Iterator;
035import java.util.LinkedHashMap;
036import java.util.LinkedHashSet;
037import java.util.LinkedList;
038import java.util.List;
039import java.util.Map;
040import java.util.Set;
041import java.util.concurrent.CountDownLatch;
042import java.util.concurrent.TimeUnit;
043import java.util.regex.Pattern;
044
045import com.unboundid.util.args.ArgumentException;
046import com.unboundid.util.args.DurationArgument;
047
048import static com.unboundid.util.Debug.*;
049import static com.unboundid.util.UtilityMessages.*;
050
051
052
053/**
054 * This class allows a FixedRateBarrier to change dynamically.  The rate changes
055 * are governed by lines read from a {@code Reader} (typically backed by a
056 * file). The input starts with a header that provides some global options and
057 * then has a list of lines, where each line contains a single rate per second,
058 * a comma, and a duration to maintain that rate.  Rates are specified as an
059 * absolute rate per second or as a rate relative to the base rate per second.
060 * The duration is an integer followed by a time unit (ms=milliseconds,
061 * s=seconds, m=minutes, h=hours, and d=days).
062 * <BR><BR>
063 * The following simple example will run at a target rate of 1000 per second
064 * for one minute, and then 10000 per second for 10 seconds.
065 * <pre>
066 *   # format=rate-duration
067 *   1000,1m
068 *   10000,10s
069 * </pre>
070 * <BR>
071 * The following example has a default duration of one minute, and will repeat
072 * the two intervals until this RateAdjustor is shut down.  The first interval
073 * is run for the default of 1 minute at two and half times the base rate, and
074 * then run for 10 seconds at 10000 per second.
075 * <pre>
076 *   # format=rate-duration
077 *   # default-duration=1m
078 *   # repeat=true
079 *   2.5X
080 *   10000,10s
081 * </pre>
082 * A {@code RateAdjustor} is a daemon thread.  It is necessary to call the
083 * {@code start()} method to start the thread and begin the rate changes.
084 * Once this finished processing the rates, the thread will complete.
085 * It can be stopped prematurely by calling {@code shutDown()}.
086 * <BR><BR>
087 * The header can contain the following options:
088 * <UL>
089 *   <LI>{@code format} (required):  This must currently have the value
090 *       {@code rate-duration}.</LI>
091 *   <LI>{@code default-duration} (optional):  This can specify a default
092 *       duration for intervals that do not include a duration.  The format
093 *       is an integer followed by a time unit as described above.</LI>
094 *   <LI>{@code repeat} (optional):  If this has a value of {@code true}, then
095 *       the rates in the input will be repeated until {@code shutDown()} is
096 *       called.</LI>
097 * </UL>
098 */
099@ThreadSafety(level = ThreadSafetyLevel.MOSTLY_THREADSAFE)
100public final class RateAdjustor extends Thread
101{
102  /**
103   * This starts a comment in the input.
104   */
105  public static final char COMMENT_START = '#';
106
107
108
109  /**
110   * The text that must appear on a line by itself in order to denote that the
111   * end of the file header has been reached.
112   */
113  public static final String END_HEADER_TEXT = "END HEADER";
114
115
116
117  /**
118   * The header key that represents the default duration.
119   */
120  public static final String DEFAULT_DURATION_KEY = "default-duration";
121
122
123
124  /**
125   * The header key that represents the format of the file.
126   */
127  public static final String FORMAT_KEY = "format";
128
129
130
131  /**
132   * The value of the format key that represents a list of rates and durations
133   * within the input file.
134   */
135  public static final String FORMAT_VALUE_RATE_DURATION = "rate-and-duration";
136
137
138
139  /**
140   * A list of all formats that we support.
141   */
142  public static final List<String> FORMATS =
143       Arrays.asList(FORMAT_VALUE_RATE_DURATION);
144
145
146
147  /**
148   * The header key that represents whether the input should be repeated.
149   */
150  public static final String REPEAT_KEY = "repeat";
151
152
153
154  /**
155   * A list of all header keys that we support.
156   */
157  public static final List<String> KEYS =
158       Arrays.asList(DEFAULT_DURATION_KEY, FORMAT_KEY, REPEAT_KEY);
159
160
161
162  // Other headers to consider:
163  // * rate-multiplier, so you can easily proportionally increase or decrease
164  //   every target rate without changing all the target rates directly.
165  // * duration-multiplier, so you can easily proportionally increase or
166  //   decrease the length of time to spend at target rates.
167  // * rate-change-behavior, so you can specify the behavior that should be
168  //   exhibited when transitioning from one rate to another (e.g., instant
169  //   jump, linear acceleration, sine-based acceleration, etc.).
170  // * jitter, so we can introduce some amount of random jitter in the target
171  //   rate (in which the actual target rate may be frequently adjusted to be
172  //   slightly higher or lower than the designated target rate).
173  // * spike, so we can introduce periodic, substantial increases in the target
174  //   rate.
175
176
177
178  // The barrier whose rate is adjusted.
179  private final FixedRateBarrier barrier;
180
181  // A list of rates per second and the number of milliseconds that the
182  // specified rate should be maintained.
183  private final List<ObjectPair<Double,Long>> ratesAndDurations;
184
185  // If this is true, then the ratesAndDurations will be repeated until this is
186  // shut down.
187  private final boolean repeat;
188
189  // Set to true when this should shut down.
190  private volatile boolean shutDown = false;
191
192  // This is used to make sure we set the initial rate before start() returns.
193  private final CountDownLatch initialRateSetLatch = new CountDownLatch(1);
194
195  // This allows us to interrupt when we are sleeping.
196  private final WakeableSleeper sleeper = new WakeableSleeper();
197
198
199
200  /**
201   * Returns a new RateAdjustor with the specified parameters.  See the
202   * class-level javadoc for more information.
203   *
204   * @param  barrier            The barrier to update based on the specified
205   *                            rates.
206   * @param  baseRatePerSecond  The baseline rate per second, or {@code null}
207   *                            if none was specified.
208   * @param  rates              A file containing a list of rates and durations
209   *                            as described in the class-level javadoc.
210   *
211   * @return  A new RateAdjustor constructed from the specified parameters.
212   *
213   * @throws  IOException               If there is a problem reading from
214   *                                    the rates Reader.
215   * @throws  IllegalArgumentException  If there is a problem with the rates
216   *                                    input.
217   */
218  public static RateAdjustor newInstance(final FixedRateBarrier barrier,
219                                         final Integer baseRatePerSecond,
220                                         final File rates)
221         throws IOException, IllegalArgumentException
222  {
223    final Reader reader = new FileReader(rates);
224    return new RateAdjustor(
225         barrier,
226         (baseRatePerSecond == null) ? 0 : baseRatePerSecond,
227         reader);
228  }
229
230
231
232  /**
233   * Retrieves a string that may be used as the description of the argument that
234   * specifies the path to a variable rate data file for use in conjunction with
235   * this rate adjustor.
236   *
237   * @param  genArgName  The name of the argument that may be used to generate a
238   *                     sample variable rate data file.
239   *
240   * @return   A string that may be used as the description of the argument that
241   *           specifies the path to a variable rate data file for use in
242   *           conjunction with this rate adjustor.
243   */
244  public static String getVariableRateDataArgumentDescription(
245                            final String genArgName)
246  {
247    return INFO_RATE_ADJUSTOR_VARIABLE_RATE_DATA_ARG_DESCRIPTION.get(
248         genArgName);
249  }
250
251
252
253  /**
254   * Retrieves a string that may be used as the description of the argument that
255   * generates a sample variable rate data file that serves as documentation of
256   * the variable rate data format.
257   *
258   * @param  dataFileArgName  The name of the argument that specifies the path
259   *                          to a file
260   *
261   * @return   A string that may be used as the description of the argument that
262   *           generates a sample variable rate data file that serves as
263   *           documentation of the variable rate data format.
264   */
265  public static String getGenerateSampleVariableRateFileDescription(
266                            final String dataFileArgName)
267  {
268    return INFO_RATE_ADJUSTOR_GENERATE_SAMPLE_RATE_FILE_ARG_DESCRIPTION.get(
269         dataFileArgName);
270  }
271
272
273
274  /**
275   * Writes a sample variable write data file to the specified location.
276   *
277   * @param  f  The path to the file to be written.
278   *
279   * @throws  IOException  If a problem is encountered while writing to the
280   *                       specified file.
281   */
282  public static void writeSampleVariableRateFile(final File f)
283         throws IOException
284  {
285    final PrintWriter w = new PrintWriter(f);
286    try
287    {
288      w.println("# This is an example variable rate data file.  All blank " +
289           "lines will be ignored.");
290      w.println("# All lines starting with the '#' character are considered " +
291           "comments and will");
292      w.println("# also be ignored.");
293      w.println();
294      w.println("# The beginning of the file must be a header containing " +
295           "properties pertaining");
296      w.println("# to the variable rate data.  All headers must be in the " +
297           "format 'name=value',");
298      w.println("# in which any spaces surrounding the equal sign will be " +
299           "ignored.");
300      w.println();
301      w.println("# The first header should be the 'format' header, which " +
302           "specifies the format");
303      w.println("# for the variable rate data file.  This header is " +
304           "required.  At present, the");
305      w.println("# only supported format is 'rate-and-duration', although " +
306           "additional formats may");
307      w.println("# be added in the future.");
308      w.println("format = rate-and-duration");
309      w.println();
310      w.println("# The optional 'default-duration' header may be used to " +
311           "specify a duration that");
312      w.println("# will be used for any interval that does not explicitly " +
313           "specify a duration.");
314      w.println("# The duration must consist of a positive integer value " +
315           "followed by a time");
316      w.println("# unit (with zero or more spaces separating the integer " +
317           "value from the unit).");
318      w.println("# The supported time units are:");
319      w.println("#");
320      w.println("# - nanoseconds, nanosecond, nanos, nano, ns");
321      w.println("# - microseconds, microseconds, micros, micro, us");
322      w.println("# - milliseconds, millisecond, millis, milli, ms");
323      w.println("# - seconds, second, secs, sec, s");
324      w.println("# - minutes, minute, mins, min, m");
325      w.println("# - hours, hour, hrs, hr, h");
326      w.println("# - days, day, d");
327      w.println("#");
328      w.println("# If no 'default-duration' header is present, then every " +
329           "data interval must");
330      w.println("# include an explicitly-specified duration.");
331      w.println("default-duration = 10 seconds");
332      w.println();
333      w.println("# The optional 'repeat' header may be used to indicate how " +
334           "the tool should");
335      w.println("# behave once the end of the variable rate data definitions " +
336           "has been reached.");
337      w.println("# If the 'repeat' header is present with a value of 'true', " +
338           "then the tool will");
339      w.println("# operate in an endless loop, returning to the beginning of " +
340           "the variable rate");
341      w.println("# definitions once the end has been reached.  If the " +
342           "'repeat' header is present");
343      w.println("# with a value of 'false', or if the 'repeat' header is " +
344           "absent, then the tool");
345      w.println("# will exit after it has processed all of the variable " +
346           "rate definitions.");
347      w.println("repeat = true");
348      w.println();
349      w.println("# After all header properties have been specified, the end " +
350           "of the header must");
351      w.println("# be signified with a line containing only the text 'END " +
352           "HEADER'.");
353      w.println("END HEADER");
354      w.println();
355      w.println();
356      w.println("# After the header is complete, the variable rate " +
357           "definitions should be");
358      w.println("# provided.  Each definition should be given on a line by " +
359           "itself, and should");
360      w.println("# contain a target rate per second and an optional length " +
361           "of time to maintain");
362      w.println("# that rate.");
363      w.println("#");
364      w.println("# The target rate must always be present in a variable " +
365           "rate definition.  It may");
366      w.println("# be either a positive integer value that specifies the " +
367           "absolute target rate");
368      w.println("# per second (e.g., a value of '1000' indicates a target " +
369           "rate of 1000");
370      w.println("# operations per second), or it may be a floating-point " +
371           "value followed by the");
372      w.println("# letter 'x' to indicate that it is a multiplier of the " +
373           "value specified by the");
374      w.println("# '--ratePerSecond' argument (e.g., if the " +
375           "'--ratePerSecond' argument is");
376      w.println("# present with a value of 1000, then a target rate value " +
377           "of '0.75x' indicates a");
378      w.println("# target rate that is 75% of the '--ratePerSecond' value, " +
379           "or 750 operations per");
380      w.println("# second).  If the latter format is used, then the " +
381           "'--ratePerSecond' argument");
382      w.println("# must be provided.");
383      w.println("#");
384      w.println("# The duration may optionally be present in a variable " +
385           "rate definition.  If");
386      w.println("# present, it must be separated from the target rate by a " +
387           "comma (and there may");
388      w.println("# be zero or more spaces on either side of the comma).  " +
389           "The duration must be in");
390      w.println("# the same format as specified in the description of the " +
391           "'default-duration'");
392      w.println("# header above (i.e., a positive integer followed by a " +
393           "time unit).  If a");
394      w.println("# variable rate definition does not include a duration, " +
395           "then the");
396      w.println("# 'default-duration' header must have been specified, and " +
397           "that default duration");
398      w.println("# will be used for that variable rate definition.");
399      w.println("#");
400      w.println("# The following variable rate definitions may be used to " +
401           "stairstep the target");
402      w.println("# rate from 1000 operations per second to 10000 operations " +
403           "per second, in");
404      w.println("# increments of 1000 operations per second, spending one " +
405           "minute at each level.");
406      w.println("# If the 'repeat' header is present with a value of 'true', " +
407           "then the process");
408      w.println("# will start back over at 1000 operations per second after " +
409           "completing one");
410      w.println("# minute at 10000 operations per second.  Otherwise, the " +
411           "tool will exit after");
412      w.println("# completing the 10000 operation-per-second interval.");
413      w.println("1000, 1 minute");
414      w.println("2000, 1 minute");
415      w.println("3000, 1 minute");
416      w.println("4000, 1 minute");
417      w.println("5000, 1 minute");
418      w.println("6000, 1 minute");
419      w.println("7000, 1 minute");
420      w.println("8000, 1 minute");
421      w.println("9000, 1 minute");
422      w.println("10000, 1 minute");
423      w.println();
424    }
425    finally
426    {
427      w.close();
428    }
429  }
430
431
432
433  /**
434   * Constructs a new RateAdjustor with the specified parameters.  See the
435   * class-level javadoc for more information.
436   *
437   * @param  barrier            The barrier to update based on the specified
438   *                            rates.
439   * @param  baseRatePerSecond  The baseline rate per second, or 0 if none was
440   *                            specified.
441   * @param  rates              A list of rates and durations as described in
442   *                            the class-level javadoc.  The reader will
443   *                            always be closed before this method returns.
444   *
445   * @throws  IOException               If there is a problem reading from
446   *                                    the rates Reader.
447   * @throws  IllegalArgumentException  If there is a problem with the rates
448   *                                    input.
449   */
450  public RateAdjustor(final FixedRateBarrier barrier,
451                      final long baseRatePerSecond,
452                      final Reader rates)
453         throws IOException, IllegalArgumentException
454  {
455    // Read the header first.
456    final List<String> lines;
457    try
458    {
459      Validator.ensureNotNull(barrier, rates);
460      setDaemon(true);
461      this.barrier = barrier;
462
463      lines = readLines(rates);
464    }
465    finally
466    {
467      rates.close();
468    }
469
470    final Map<String,String> header = consumeHeader(lines);
471
472    final Set<String> invalidKeys = new LinkedHashSet<String>(header.keySet());
473    invalidKeys.removeAll(KEYS);
474    if (! invalidKeys.isEmpty())
475    {
476      throw new IllegalArgumentException(
477           ERR_RATE_ADJUSTOR_INVALID_KEYS.get(invalidKeys, KEYS));
478    }
479
480    final String format = header.get(FORMAT_KEY);
481    if (format == null)
482    {
483      throw new IllegalArgumentException(ERR_RATE_ADJUSTOR_MISSING_FORMAT.get(
484           FORMAT_KEY, FORMATS, COMMENT_START));
485    }
486
487    if (! format.equals(FORMAT_VALUE_RATE_DURATION))
488    {
489      // For now this is the only format that we support.
490      throw new IllegalArgumentException(
491           ERR_RATE_ADJUSTOR_INVALID_FORMAT.get(format, FORMAT_KEY, FORMATS));
492    }
493
494    repeat = Boolean.parseBoolean(header.get(REPEAT_KEY));
495
496    // This will be non-zero if it's set in the input.
497    long defaultDurationMillis = 0;
498    final String defaultDurationStr = header.get(DEFAULT_DURATION_KEY);
499    if (defaultDurationStr != null)
500    {
501      try
502      {
503        defaultDurationMillis = DurationArgument.parseDuration(
504             defaultDurationStr, TimeUnit.MILLISECONDS);
505      }
506      catch (final ArgumentException e)
507      {
508        debugException(e);
509        throw new IllegalArgumentException(
510             ERR_RATE_ADJUSTOR_INVALID_DEFAULT_DURATION.get(
511                        defaultDurationStr, e.getExceptionMessage()),
512             e);
513      }
514    }
515
516    // Now parse out the rates and durations, which will look like this:
517    //  1000,1s
518    //  1.5,1d
519    //  0.5X, 1m
520    //  # Duration can be omitted if default-duration header was included.
521    //  1000
522    final List<ObjectPair<Double,Long>> ratesAndDurationList =
523            new ArrayList<ObjectPair<Double,Long>>(10);
524    final Pattern splitPattern = Pattern.compile("\\s*,\\s*");
525    for (final String fullLine: lines)
526    {
527      // Strip out comments and white space.
528      String line = fullLine;
529      final int commentStart = fullLine.indexOf(COMMENT_START);
530      if (commentStart >= 0)
531      {
532        line = line.substring(0, commentStart);
533      }
534      line = line.trim();
535
536      if (line.length() == 0)
537      {
538        continue;
539      }
540
541      final String[] fields = splitPattern.split(line);
542      if (!((fields.length == 2) ||
543            ((fields.length == 1) && defaultDurationMillis != 0)))
544      {
545        throw new IllegalArgumentException(ERR_RATE_ADJUSTOR_INVALID_LINE.get(
546             fullLine, DEFAULT_DURATION_KEY));
547      }
548
549      String rateStr = fields[0];
550
551      boolean isRateMultiplier = false;
552      if (rateStr.endsWith("X") || rateStr.endsWith("x"))
553      {
554        rateStr = rateStr.substring(0, rateStr.length() - 1).trim();
555        isRateMultiplier = true;
556      }
557
558      double rate;
559      try
560      {
561        rate = Double.parseDouble(rateStr);
562      }
563      catch (final NumberFormatException e)
564      {
565        debugException(e);
566        throw new IllegalArgumentException(
567             ERR_RATE_ADJUSTOR_INVALID_RATE.get(rateStr, fullLine), e);
568      }
569
570      // Values that look like 2X are a multiplier on the base rate.
571      if (isRateMultiplier)
572      {
573        if (baseRatePerSecond <= 0)
574        {
575          throw new IllegalArgumentException(
576                  ERR_RATE_ADJUSTOR_RELATIVE_RATE_WITHOUT_BASELINE.get(
577                          rateStr, fullLine));
578        }
579
580        rate *= baseRatePerSecond;
581      }
582
583      final long durationMillis;
584      if (fields.length < 2)
585      {
586        durationMillis = defaultDurationMillis;
587      }
588      else
589      {
590        final String duration = fields[1];
591        try
592        {
593          durationMillis = DurationArgument.parseDuration(
594                  duration, TimeUnit.MILLISECONDS);
595        }
596        catch (final ArgumentException e)
597        {
598          debugException(e);
599          throw new IllegalArgumentException(
600               ERR_RATE_ADJUSTOR_INVALID_DURATION.get(duration, fullLine,
601                    e.getExceptionMessage()),
602               e);
603        }
604      }
605
606      ratesAndDurationList.add(
607           new ObjectPair<Double,Long>(rate, durationMillis));
608    }
609    ratesAndDurations = Collections.unmodifiableList(ratesAndDurationList);
610  }
611
612
613
614  /**
615   * Starts this thread and waits for the initial rate to be set.
616   */
617  @Override
618  public void start()
619  {
620    super.start();
621
622    // Wait until the initial rate is set.  Assuming the caller starts this
623    // RateAdjustor before the FixedRateBarrier is used by other threads,
624    // this will guarantee that the initial rate is in place before the
625    // barrier is used.
626    try
627    {
628      initialRateSetLatch.await();
629    }
630    catch (final InterruptedException e)
631    {
632      debugException(e);
633    }
634  }
635
636
637
638  /**
639   * Adjusts the rate in FixedRateBarrier as described in the rates.
640   */
641  @Override
642  public void run()
643  {
644    try
645    {
646      if (ratesAndDurations.isEmpty())
647      {
648        return;
649      }
650
651      do
652      {
653        final List<ObjectPair<Double,Long>> ratesAndEndTimes =
654             new ArrayList<ObjectPair<Double,Long>>(ratesAndDurations.size());
655        long endTime = System.currentTimeMillis();
656        for (final ObjectPair<Double,Long> rateAndDuration : ratesAndDurations)
657        {
658          endTime += rateAndDuration.getSecond();
659          ratesAndEndTimes.add(new ObjectPair<Double,Long>(
660               rateAndDuration.getFirst(), endTime));
661        }
662
663        for (final ObjectPair<Double,Long> rateAndEndTime: ratesAndEndTimes)
664        {
665          if (shutDown)
666          {
667            return;
668          }
669
670          final double rate = rateAndEndTime.getFirst();
671          final long intervalMillis = barrier.getTargetRate().getFirst();
672          final int perInterval = calculatePerInterval(intervalMillis, rate);
673
674          barrier.setRate(intervalMillis, perInterval);
675
676          // Signal start() that we've set the initial rate.
677          if (initialRateSetLatch.getCount() > 0)
678          {
679            initialRateSetLatch.countDown();
680          }
681
682          // Hold at this rate for the specified duration.
683          final long durationMillis =
684               rateAndEndTime.getSecond() - System.currentTimeMillis();
685          if (durationMillis > 0L)
686          {
687            sleeper.sleep(durationMillis);
688          }
689        }
690      }
691      while (repeat);
692    }
693    finally
694    {
695      // Just in case we happened to be shutdown before we were started.
696      // We still want start() to be able to return.
697      if (initialRateSetLatch.getCount() > 0)
698      {
699        initialRateSetLatch.countDown();
700      }
701    }
702  }
703
704
705
706  /**
707   * Signals this to shut down.
708   */
709  public void shutDown()
710  {
711    shutDown = true;
712    sleeper.wakeup();
713  }
714
715
716
717  /**
718   * Returns the of rates and durations.  This is primarily here for testing
719   * purposes.
720   *
721   * @return  The list of rates and durations.
722   */
723  List<ObjectPair<Double,Long>> getRatesAndDurations()
724  {
725    return ratesAndDurations;
726  }
727
728
729
730  /**
731   * Calculates the rate per interval given the specified interval width
732   * and the target rate per second.  (This is static and non-private so that
733   * it can be unit tested.)
734   *
735   * @param intervalDurationMillis  The duration of the interval in
736   *                                milliseconds.
737   * @param ratePerSecond           The target rate per second.
738   *
739   * @return  The rate per interval, which will be at least 1.
740   */
741  static int calculatePerInterval(final long intervalDurationMillis,
742                                  final double ratePerSecond)
743  {
744    final double intervalDurationSeconds = intervalDurationMillis / 1000.0;
745    final double ratePerInterval = ratePerSecond * intervalDurationSeconds;
746    return (int)Math.max(1, Math.round(ratePerInterval));
747  }
748
749
750
751  /**
752   * This reads the header at the start of the file.  All blank lines and
753   * comment lines will be ignored.  The end of the header will be signified by
754   * a line containing only the text "END HEADER".  All non-blank, non-comment
755   * lines in the header must be in the format "name=value", where there may be
756   * zero or more spaces on either side of the equal sign, the name must not
757   * contain either the space or the equal sign character, and the value must
758   * not begin or end with a space.  Header lines must not contain partial-line
759   * comments.
760   *
761   * @param  lines  The lines of input that include the header.
762   *
763   * @return  A map of key/value pairs extracted from the header.
764   *
765   * @throws  IllegalArgumentException  If a problem is encountered while
766   *                                    parsing the header (e.g., a malformed
767   *                                    header line is encountered, multiple
768   *                                    headers have the same key, there is no
769   *                                    end of header marker, etc.).
770   */
771  static Map<String,String> consumeHeader(final List<String> lines)
772         throws IllegalArgumentException
773  {
774    // The header will look like this:
775    // key1=value1
776    // key2 = value2
777    // END HEADER
778    boolean endHeaderFound = false;
779    final Map<String,String> headerMap = new LinkedHashMap<String,String>(3);
780    final Iterator<String> lineIter = lines.iterator();
781    while (lineIter.hasNext())
782    {
783      final String line = lineIter.next().trim();
784      lineIter.remove();
785
786      if ((line.length() == 0) ||
787           line.startsWith(String.valueOf(COMMENT_START)))
788      {
789        continue;
790      }
791
792      if (line.equalsIgnoreCase(END_HEADER_TEXT))
793      {
794        endHeaderFound = true;
795        break;
796      }
797
798      final int equalPos = line.indexOf('=');
799      if (equalPos < 0)
800      {
801        throw new IllegalArgumentException(
802             ERR_RATE_ADJUSTOR_HEADER_NO_EQUAL.get(line));
803      }
804
805      final String key = line.substring(0, equalPos).trim();
806      if (key.length() == 0)
807      {
808        throw new IllegalArgumentException(
809             ERR_RATE_ADJUSTOR_HEADER_EMPTY_KEY.get(line));
810      }
811
812      final String newValue = line.substring(equalPos+1).trim();
813      final String existingValue = headerMap.get(key);
814      if (existingValue != null)
815      {
816        throw new IllegalArgumentException(
817             ERR_RATE_ADJUSTOR_DUPLICATE_HEADER_KEY.get(key, existingValue,
818                  newValue));
819      }
820
821      headerMap.put(key, newValue);
822    }
823
824    if (! endHeaderFound)
825    {
826      // This means we iterated across all lines without finding the end header
827      // marker.
828      throw new IllegalArgumentException(
829           ERR_RATE_ADJUSTOR_NO_END_HEADER_FOUND.get(END_HEADER_TEXT));
830    }
831
832    return headerMap;
833  }
834
835
836
837  /**
838   * Returns a list of the lines read from the specified Reader.
839   *
840   * @param  reader  The Reader to read from.
841   *
842   * @return  A list of the lines read from the specified Reader.
843   *
844   * @throws  IOException  If there is a problem reading from the Reader.
845   */
846  private static List<String> readLines(final Reader reader) throws IOException
847  {
848    final BufferedReader bufferedReader = new BufferedReader(reader);
849
850    // We remove items from the front of the list, so a linked list works best.
851    final List<String> lines = new LinkedList<String>();
852
853    String line;
854    while ((line = bufferedReader.readLine()) != null)
855    {
856      lines.add(line);
857    }
858
859    return lines;
860  }
861}
862