class Celluloid::Mailbox

Actors communicate with asynchronous messages. Messages are buffered in Mailboxes until Actors can act upon them.

Attributes

address[R]

A unique address at which this mailbox can be found

max_size[RW]

Public Class Methods

new() click to toggle source
# File lib/celluloid/mailbox.rb, line 16
def initialize
  @address   = Celluloid.uuid
  @messages  = []
  @mutex     = Mutex.new
  @dead      = false
  @condition = ConditionVariable.new
  @max_size  = nil
end

Public Instance Methods

<<(message) click to toggle source

Add a message to the Mailbox

# File lib/celluloid/mailbox.rb, line 26
def <<(message)
  @mutex.lock
  begin
    if mailbox_full || @dead
      dead_letter(message)
      return
    end
    if message.is_a?(SystemEvent)
      # SystemEvents are high priority messages so they get added to the
      # head of our message queue instead of the end
      @messages.unshift message
    else
      @messages << message
    end

    @condition.signal
    nil
  ensure
    @mutex.unlock rescue nil
  end
end
alive?() click to toggle source

Is the mailbox alive?

# File lib/celluloid/mailbox.rb, line 118
def alive?
  !@dead
end
each(&block) click to toggle source

Iterate through the mailbox

# File lib/celluloid/mailbox.rb, line 128
def each(&block)
  to_a.each(&block)
end
inspect() click to toggle source

Inspect the contents of the Mailbox

# File lib/celluloid/mailbox.rb, line 133
def inspect
  "#<#{self.class}:#{object_id.to_s(16)} @messages=[#{map { |m| m.inspect }.join(', ')}]>"
end
next_message() { |msg| ... } click to toggle source

Retrieve the next message in the mailbox

# File lib/celluloid/mailbox.rb, line 80
def next_message
  message = nil

  if block_given?
    index = @messages.index do |msg|
      yield(msg) || msg.is_a?(SystemEvent)
    end

    message = @messages.slice!(index, 1).first if index
  else
    message = @messages.shift
  end

  message
end
receive(timeout = nil, &block) click to toggle source

Receive a message from the Mailbox

# File lib/celluloid/mailbox.rb, line 49
def receive(timeout = nil, &block)
  message = nil

  @mutex.lock
  begin
    raise MailboxDead, "attempted to receive from a dead mailbox" if @dead

    begin
      message = next_message(&block)

      unless message
        if timeout
          now = Time.now
          wait_until ||= now + timeout
          wait_interval = wait_until - now
          return if wait_interval <= 0
        else
          wait_interval = nil
        end

        @condition.wait(@mutex, wait_interval)
      end
    end until message

    message
  ensure
    @mutex.unlock rescue nil
  end
end
shutdown() { || ... } click to toggle source

Shut down this mailbox and clean up its contents

# File lib/celluloid/mailbox.rb, line 97
def shutdown
  raise MailboxDead, "mailbox already shutdown" if @dead

  @mutex.lock
  begin
    yield if block_given?
    messages = @messages
    @messages = []
    @dead = true
  ensure
    @mutex.unlock rescue nil
  end

  messages.each do |msg|
    dead_letter msg
    msg.cleanup if msg.respond_to? :cleanup
  end
  true
end
size() click to toggle source

Number of messages in the Mailbox

# File lib/celluloid/mailbox.rb, line 138
def size
  @mutex.synchronize { @messages.size }
end
to_a() click to toggle source

Cast to an array

# File lib/celluloid/mailbox.rb, line 123
def to_a
  @mutex.synchronize { @messages.dup }
end

Private Instance Methods

dead_letter(message) click to toggle source
# File lib/celluloid/mailbox.rb, line 144
def dead_letter(message)
  Logger.debug "Discarded message (mailbox is dead): #{message}" if $CELLULOID_DEBUG
end
mailbox_full() click to toggle source
# File lib/celluloid/mailbox.rb, line 148
def mailbox_full
  @max_size && @messages.size >= @max_size
end