class Raindrops::Aggregate::PMQ

Aggregate + POSIX message queues support for Ruby 1.9 and Linux

This class is duck-type compatible with Aggregate and allows us to aggregate and share statistics from multiple processes/threads aided POSIX message queues. This is designed to be used with the Raindrops::LastDataRecv Rack application, but can be used independently on compatible Runtimes.

Unlike the core of raindrops, this is only supported on Ruby 1.9 and Linux 2.6. Using this class requires the following additional RubyGems or libraries:

Design

There is one master thread which aggregates statistics. Individual worker processes or threads will write to a shared POSIX message queue (default: “/raindrops”) that the master reads from. At a predefined interval, the master thread will write out to a shared, anonymous temporary file that workers may read from

Setting :worker_interval and :master_interval to 1 will result in perfect accuracy but at the cost of a high synchronization overhead. Larger intervals mean less frequent messaging for higher performance but lower accuracy.

Attributes

nr_dropped[R]

returns the number of dropped messages sent to a POSIX message queue if non-blocking operation was desired with :lossy

Public Class Methods

new(params = {}) click to toggle source

Creates a new Raindrops::Aggregate::PMQ object

Raindrops::Aggregate::PMQ.new(options = {})  -> aggregate

options is a hash that accepts the following keys:

  • :queue - name of the POSIX message queue (default: “/raindrops”)

  • :worker_interval - interval to send to the master (default: 10)

  • :master_interval - interval to for the master to write out (default: 5)

  • :lossy - workers drop packets if master cannot keep up (default: false)

  • :aggregate - Aggregate object (default: Aggregate.new)

  • :mq_umask - umask for creatingthe POSIX message queue (default: 0666)

# File lib/raindrops/aggregate/pmq.rb, line 64
def initialize(params = {})
  opts = {
    :queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops",
    :worker_interval => 10,
    :master_interval => 5,
    :lossy => false,
    :mq_attr => nil,
    :mq_umask => 0666,
    :aggregate => Aggregate.new,
  }.merge! params
  @master_interval = opts[:master_interval]
  @worker_interval = opts[:worker_interval]
  @aggregate = opts[:aggregate]
  @worker_queue = @worker_interval ? [] : nil
  @mutex = Mutex.new

  @mq_name = opts[:queue]
  mq = POSIX_MQ.new @mq_name, :w, opts[:mq_umask], opts[:mq_attr]
  Tempfile.open("raindrops_pmq") do |t|
    @wr = File.open(t.path, "wb")
    @rd = File.open(t.path, "rb")
  end
  @cached_aggregate = @aggregate
  flush_master
  @mq_send = if opts[:lossy]
    @nr_dropped = 0
    mq.nonblock = true
    mq.method :trysend
  else
    mq.method :send
  end
end

Public Instance Methods

<<(val) click to toggle source

adds a sample to the underlying Aggregate object

# File lib/raindrops/aggregate/pmq.rb, line 98
def << val
  if q = @worker_queue
    q << val
    if q.size >= @worker_interval
      mq_send(q) or @nr_dropped += 1
      q.clear
    end
  else
    mq_send(val) or @nr_dropped += 1
  end
end
aggregate() click to toggle source

Loads the last shared Aggregate from the master thread/process

# File lib/raindrops/aggregate/pmq.rb, line 149
def aggregate
  @cached_aggregate ||= begin
    flush
    Marshal.load(synchronize(@rd, RDLOCK) do |rd|
      IO.pread rd.fileno, rd.stat.size, 0
    end)
  end
end
count() click to toggle source

proxy for Aggregate#count

# File lib/raindrops/aggregate/pmq.rb, line 207
def count; aggregate.count; end
each() { |*args| ... } click to toggle source

proxy for Aggregate#each

# File lib/raindrops/aggregate/pmq.rb, line 234
def each; aggregate.each { |*args| yield(*args) }; end
each_nonzero() { |*args| ... } click to toggle source

proxy for Aggregate#each_nonzero

# File lib/raindrops/aggregate/pmq.rb, line 237
def each_nonzero; aggregate.each_nonzero { |*args| yield(*args) }; end
flush() click to toggle source

flushes the local queue of the worker process, sending all pending data to the master. There is no need to call this explicitly as :worker_interval defines how frequently your queue will be flushed

# File lib/raindrops/aggregate/pmq.rb, line 198
def flush
  if q = @local_queue && ! q.empty?
    mq_send q
    q.clear
  end
  nil
end
flush_master() click to toggle source

Flushes the currently aggregate statistics to a temporary file. There is no need to call this explicitly as :worker_interval defines how frequently your data will be flushed for workers to read.

# File lib/raindrops/aggregate/pmq.rb, line 161
def flush_master
  dump = Marshal.dump @aggregate
  synchronize(@wr, WRLOCK) do |wr|
    wr.truncate 0
    IO.pwrite wr.fileno, dump, 0
  end
end
master_loop() click to toggle source

Starts running a master loop, usually in a dedicated thread or process:

Thread.new { agg.master_loop }

Any worker can call agg.stop_master_loop to stop the master loop (possibly causing the thread or process to exit)

# File lib/raindrops/aggregate/pmq.rb, line 122
def master_loop
  buf = ""
  a = @aggregate
  nr = 0
  mq = POSIX_MQ.new @mq_name, :r # this one is always blocking
  begin
    if (nr -= 1) < 0
      nr = @master_interval
      flush_master
    end
    mq.shift(buf)
    data = begin
      Marshal.load(buf) or return
    rescue ArgumentError, TypeError
      next
    end
    Array === data ? data.each { |x| a << x } : a << data
  rescue Errno::EINTR
  rescue => e
    warn "Unhandled exception in #{__FILE__}:#{__LINE__}: #{e}"
    break
  end while true
  ensure
    flush_master
end
max() click to toggle source

proxy for Aggregate#max

# File lib/raindrops/aggregate/pmq.rb, line 210
def max; aggregate.max; end
mean() click to toggle source

proxy for Aggregate#mean

# File lib/raindrops/aggregate/pmq.rb, line 219
def mean; aggregate.mean; end
min() click to toggle source

proxy for Aggregate#min

# File lib/raindrops/aggregate/pmq.rb, line 213
def min; aggregate.min; end
outliers_high() click to toggle source

proxy for Aggregate#outliers_high

# File lib/raindrops/aggregate/pmq.rb, line 228
def outliers_high; aggregate.outliers_high; end
outliers_low() click to toggle source

proxy for Aggregate#outliers_low

# File lib/raindrops/aggregate/pmq.rb, line 225
def outliers_low; aggregate.outliers_low; end
std_dev() click to toggle source

proxy for Aggregate#std_dev

# File lib/raindrops/aggregate/pmq.rb, line 222
def std_dev; aggregate.std_dev; end
stop_master_loop() click to toggle source

stops the currently running master loop, may be called from any worker thread or process

# File lib/raindrops/aggregate/pmq.rb, line 171
def stop_master_loop
  sleep 0.1 until mq_send(false)
  rescue Errno::EINTR
    retry
end
sum() click to toggle source

proxy for Aggregate#sum

# File lib/raindrops/aggregate/pmq.rb, line 216
def sum; aggregate.sum; end
to_s(*args) click to toggle source

proxy for Aggregate#to_s

# File lib/raindrops/aggregate/pmq.rb, line 231
def to_s(*args); aggregate.to_s(*args); end