001/* 002 * Copyright 2009-2014 UnboundID Corp. 003 * All Rights Reserved. 004 */ 005/* 006 * Copyright (C) 2009-2014 UnboundID Corp. 007 * 008 * This program is free software; you can redistribute it and/or modify 009 * it under the terms of the GNU General Public License (GPLv2 only) 010 * or the terms of the GNU Lesser General Public License (LGPLv2.1 only) 011 * as published by the Free Software Foundation. 012 * 013 * This program is distributed in the hope that it will be useful, 014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 016 * GNU General Public License for more details. 017 * 018 * You should have received a copy of the GNU General Public License 019 * along with this program; if not, see <http://www.gnu.org/licenses>. 020 */ 021package com.unboundid.util; 022 023 024 025import java.io.Serializable; 026import java.util.ArrayList; 027import java.util.Collections; 028import java.util.List; 029import java.util.logging.Level; 030 031import static com.unboundid.util.Debug.*; 032 033 034 035/** 036 * Instances of this class are used to ensure that certain actions are performed 037 * at a fixed rate per interval (e.g. 10000 search operations per second). 038 * <p> 039 * Once a class is constructed with the duration of an interval and the target 040 * per interval, the {@link #await} method only releases callers at the 041 * specified number of times per interval. This class is most useful when 042 * the target number per interval exceeds the limits of other approaches 043 * such as {@code java.util.Timer} or 044 * {@code java.util.concurrent.ScheduledThreadPoolExecutor}. For instance, 045 * this does a good job of ensuring that something happens about 10000 times 046 * per second, but it's overkill to ensure something happens five times per 047 * hour. This does come at a cost. In the worst case, a single thread is 048 * tied up in a loop doing a small amount of computation followed by a 049 * Thread.yield(). Calling Thread.sleep() is not possible because many 050 * platforms sleep for a minimum of 10ms, and all platforms require sleeping 051 * for at least 1ms. 052 * <p> 053 * Testing has shown that this class is accurate for a "no-op" 054 * action up to two million per second, which vastly exceeds its 055 * typical use in tools such as {@code searchrate} and {@code modrate}. This 056 * class is designed to be called by multiple threads, however, it does not 057 * make any fairness guarantee between threads; a single-thread might be 058 * released from the {@link #await} method many times before another thread 059 * that is blocked in that method. 060 * <p> 061 * This class attempts to smooth out the target per interval throughout each 062 * interval. At a given ratio, R between 0 and 1, through the interval, the 063 * expected number of actions to have been performed in the interval at that 064 * time is R times the target per interval. That is, 10% of the way through 065 * the interval, approximately 10% of the actions have been performed, and 066 * 80% of the way through the interval, 80% of the actions have been performed. 067 */ 068@ThreadSafety(level=ThreadSafetyLevel.COMPLETELY_THREADSAFE) 069public final class FixedRateBarrier 070 implements Serializable 071{ 072 /** 073 * The serial version UID for this serializable class. 074 */ 075 private static final long serialVersionUID = -3490156685189909611L; 076 077 /** 078 * The minimum number of milliseconds that Thread.sleep() can handle 079 * accurately. This varies from platform to platform, so we measure it 080 * once in the static initializer below. When using a low rate (such as 081 * 100 per second), we can often sleep between iterations instead of having 082 * to spin calling Thread.yield(). 083 */ 084 private static final long minSleepMillis; 085 086 static 087 { 088 // Calibrate the minimum number of milliseconds that we can reliably 089 // sleep on this system. We take several measurements and take the median, 090 // which keeps us from choosing an outlier. 091 // 092 // It varies from system to system. Testing on three systems, yielded 093 // three different measurements Solaris x86 (10 ms), RedHat Linux (2 ms), 094 // Windows 7 (1 ms). 095 096 final List<Long> minSleepMillisMeasurements = new ArrayList<Long>(); 097 098 for (int i = 0; i < 11; i++) 099 { 100 final long timeBefore = System.currentTimeMillis(); 101 try 102 { 103 Thread.sleep(1); 104 } 105 catch (InterruptedException e) 106 { 107 debugException(e); 108 } 109 final long sleepMillis = System.currentTimeMillis() - timeBefore; 110 minSleepMillisMeasurements.add(sleepMillis); 111 } 112 113 Collections.sort(minSleepMillisMeasurements); 114 final long medianSleepMillis = minSleepMillisMeasurements.get( 115 minSleepMillisMeasurements.size()/2); 116 117 minSleepMillis = Math.max(medianSleepMillis, 1); 118 119 final String message = "Calibrated FixedRateBarrier to use " + 120 "minSleepMillis=" + minSleepMillis + ". " + 121 "Minimum sleep measurements = " + minSleepMillisMeasurements; 122 debug(Level.INFO, DebugType.OTHER, message); 123 } 124 125 126 // This tracks when this class is shut down. Calls to await() after 127 // shutdownRequested() is called, will return immediately with a value of 128 // true. 129 private volatile boolean shutdownRequested = false; 130 131 132 // 133 // The following class variables are guarded by synchronized(this). 134 // 135 136 // The duration of the target interval in nano-seconds. 137 private long intervalDurationNanos; 138 139 // This tracks the number of milliseconds between each iteration if they 140 // were evenly spaced. 141 // 142 // If intervalDurationMs=1000 and perInterval=100, then this is 100. 143 // If intervalDurationMs=1000 and perInterval=10000, then this is .1. 144 private double millisBetweenIterations; 145 146 // The target number of times to release a thread per interval. 147 private int perInterval; 148 149 // A count of the number of times that await has returned within the current 150 // interval. 151 private long countInThisInterval; 152 153 // The start of this interval in terms of System.nanoTime(). 154 private long intervalStartNanos; 155 156 // The end of this interval in terms of System.nanoTime(). 157 private long intervalEndNanos; 158 159 160 161 /** 162 * Constructs a new FixedRateBarrier, which is active until 163 * {@link #shutdownRequested} is called. 164 * 165 * @param intervalDurationMs The duration of the interval in milliseconds. 166 * @param perInterval The target number of times that {@link #await} should 167 * return per interval. 168 */ 169 public FixedRateBarrier(final long intervalDurationMs, final int perInterval) 170 { 171 setRate(intervalDurationMs, perInterval); 172 } 173 174 175 176 /** 177 * Updates the rates associated with this FixedRateBarrier. The new rate 178 * will be in effect when this method returns. 179 * 180 * @param intervalDurationMs The duration of the interval in milliseconds. 181 * @param perInterval The target number of times that {@link #await} should 182 * return per interval. 183 */ 184 public synchronized void setRate(final long intervalDurationMs, 185 final int perInterval) 186 { 187 Validator.ensureTrue(intervalDurationMs > 0, 188 "FixedRateBarrier.intervalDurationMs must be at least 1."); 189 Validator.ensureTrue(perInterval > 0, 190 "FixedRateBarrier.perInterval must be at least 1."); 191 192 this.perInterval = perInterval; 193 194 intervalDurationNanos = 1000L * 1000L * intervalDurationMs; 195 196 millisBetweenIterations = (double)intervalDurationMs/(double)perInterval; 197 198 // Reset the intervals and all of the counters. 199 countInThisInterval = 0; 200 intervalStartNanos = 0; 201 intervalEndNanos = 0; 202 } 203 204 205 206 /** 207 * This method waits until it is time for the next 'action' to be performed 208 * based on the specified interval duration and target per interval. This 209 * method can be called by multiple threads simultaneously. This method 210 * returns immediately if shutdown has been requested. 211 * 212 * @return {@code true} if shutdown has been requested and {@code} false 213 * otherwise. 214 */ 215 public synchronized boolean await() 216 { 217 // Loop forever until we are requested to shutdown or it is time to perform 218 // the next 'action' in which case we break from the loop. 219 while (!shutdownRequested) 220 { 221 final long now = System.nanoTime(); 222 223 if ((intervalStartNanos == 0) || // Handles the first time we're called. 224 (now < intervalStartNanos)) // Handles a change in the clock. 225 { 226 intervalStartNanos = now; 227 intervalEndNanos = intervalStartNanos + intervalDurationNanos; 228 } 229 else if (now >= intervalEndNanos) // End of an interval. 230 { 231 countInThisInterval = 0; 232 233 if (now < (intervalEndNanos + intervalDurationNanos)) 234 { 235 // If we have already passed the end of the next interval, then we 236 // don't try to catch up. Instead we just reset the start of the 237 // next interval to now. This could happen if the system clock 238 // was set to the future, we're running in a debugger, or we have 239 // very short intervals and are unable to keep up. 240 intervalStartNanos = now; 241 } 242 else 243 { 244 // Usually we're some small fraction into the next interval, so 245 // we set the start of the current interval to the end of the 246 // previous one. 247 intervalStartNanos = intervalEndNanos; 248 } 249 intervalEndNanos = intervalStartNanos + intervalDurationNanos; 250 } 251 252 final long intervalRemaining = intervalEndNanos - now; 253 if (intervalRemaining <= 0) 254 { 255 // This shouldn't happen, but we're careful not to divide by 0. 256 continue; 257 } 258 259 final double intervalFractionRemaining = 260 (double) intervalRemaining / intervalDurationNanos; 261 262 final double expectedRemaining = intervalFractionRemaining * perInterval; 263 final long actualRemaining = perInterval - countInThisInterval; 264 265 if (actualRemaining >= expectedRemaining) 266 { 267 // We are on schedule or behind schedule so let the next 'action' 268 // happen. 269 countInThisInterval++; 270 break; 271 } 272 else 273 { 274 // If we can sleep until it's time to leave this barrier, then do 275 // so to keep from spinning on a CPU doing Thread.yield(). 276 277 final double gapIterations = expectedRemaining - actualRemaining; 278 final long remainingMillis = 279 (long) Math.floor(millisBetweenIterations * gapIterations); 280 281 if (remainingMillis >= minSleepMillis) 282 { 283 // Cap how long we sleep so that we can respond to a change in the 284 // rate without too much delay. 285 final long waitTime = Math.min(remainingMillis, 10); 286 try 287 { 288 // We need to wait here instead of Thread.sleep so that we don't 289 // block setRate. 290 this.wait(waitTime); 291 } 292 catch (InterruptedException e) 293 { 294 debugException(e); 295 } 296 } 297 else 298 { 299 // We're ahead of schedule so yield to other threads, and then try 300 // again. Note: this is the most costly part of the algorithm because 301 // we have to busy wait due to the lack of sleeping for very small 302 // amounts of time. 303 Thread.yield(); 304 } 305 } 306 } 307 308 return shutdownRequested; 309 } 310 311 312 313 /** 314 * Retrieves information about the current target rate for this barrier. The 315 * value returned will include a {@code Long} that specifies the duration of 316 * the current interval in milliseconds and an {@code Integer} that specifies 317 * the number of times that the {@link #await} method should return per 318 * interval. 319 * 320 * @return Information about hte current target rate for this barrier. 321 */ 322 public synchronized ObjectPair<Long,Integer> getTargetRate() 323 { 324 return new ObjectPair<Long,Integer>( 325 (intervalDurationNanos / (1000L * 1000L)), 326 perInterval); 327 } 328 329 330 331 /** 332 * Shuts down this barrier. Future calls to await() will return immediately. 333 */ 334 public void shutdownRequested() 335 { 336 shutdownRequested = true; 337 } 338 339 340 341 /** 342 * Returns {@code true} if shutdown has been requested. 343 * 344 * @return {@code true} if shutdown has been requested and {@code false} 345 * otherwise. 346 */ 347 public boolean isShutdownRequested() 348 { 349 return shutdownRequested; 350 } 351}