public class EventBus extends Object implements Bus<Event<?>>, Consumer<Event<?>>
Event
Consumer
s that can
subsequently be notified of events. A consumer is typically registered with a Selector
which, by matching on
the notification key, governs which events the consumer will receive. When a Reactor is notified of
an Event
, a task is dispatched using the reactor's Dispatcher
which causes it to be executed on a
thread based on the implementation of the Dispatcher
being used.Modifier and Type | Class and Description |
---|---|
class |
EventBus.ReplyToConsumer<E extends Event<?>,V> |
static class |
EventBus.ReplyToEvent<T> |
Constructor and Description |
---|
EventBus(Dispatcher dispatcher)
Create a new Reactor that uses the given
Dispatcher . |
EventBus(Dispatcher dispatcher,
Router router)
Create a new Reactor that uses the given
Dispatcher . |
EventBus(Dispatcher dispatcher,
Router router,
Consumer<Throwable> dispatchErrorHandler,
Consumer<Throwable> uncaughtErrorHandler) |
EventBus(Registry<Object,Consumer<? extends Event<?>>> consumerRegistry,
Dispatcher dispatcher,
Router router,
Consumer<Throwable> dispatchErrorHandler,
Consumer<Throwable> uncaughtErrorHandler)
Create a new Reactor that uses the given
dispatacher and eventRouter . |
Modifier and Type | Method and Description |
---|---|
void |
accept(Event<?> event)
Execute the logic of the action, accepting the given parameter.
|
static EventBusSpec |
config()
Create a new
EventBusSpec to configure a Reactor. |
static EventBus |
create()
Create a new synchronous
EventBus |
static EventBus |
create(Dispatcher dispatcher)
Create a new
EventBus using the given Dispatcher . |
static EventBus |
create(Environment env)
Create a new
EventBus using the given Environment . |
static EventBus |
create(Environment env,
Dispatcher dispatcher)
|
static EventBus |
create(Environment env,
String dispatcher)
Create a new
EventBus using the given Environment and dispatcher name. |
Registry<Object,Consumer<? extends Event<?>>> |
getConsumerRegistry()
|
Dispatcher |
getDispatcher()
Get the
Dispatcher currently in use. |
Consumer<Throwable> |
getDispatchErrorHandler() |
UUID |
getId()
Get the unique, time-used
UUID of this Reactor. |
Router |
getRouter()
|
Consumer<Throwable> |
getUncaughtErrorHandler() |
EventBus |
notify(Object key)
Notify this component that the consumers registered with a
Selector that matches the key should be
triggered with a null input argument. |
EventBus |
notify(Object key,
Event<?> ev)
Notify this component that an
Event is ready to be processed. |
EventBus |
notify(Object key,
Supplier<? extends Event<?>> supplier)
Notify this component that the given
Supplier can provide an event that's ready to be
processed. |
EventBus |
notify(org.reactivestreams.Publisher<?> source,
Object key)
Pass values accepted by this
Stream into the given Bus , notifying with the given key. |
<T> EventBus |
notify(org.reactivestreams.Publisher<? extends T> source,
Function<? super T,?> keyMapper)
Pass values accepted by this
Stream into the given Bus , notifying with the given key. |
org.reactivestreams.Publisher<? extends Event<?>> |
on(Selector broadcastSelector)
|
<T extends Event<?>> |
on(Selector selector,
Consumer<T> consumer)
|
<T> Consumer<Event<T>> |
prepare(Object key)
Create an optimized path for publishing notifications to the given key.
|
<T extends Event<?>,V> |
receive(Selector sel,
Function<T,V> fn)
|
boolean |
respondsToKey(Object key)
|
<T> void |
schedule(Consumer<T> consumer,
T data)
Schedule an arbitrary
Consumer to be executed on the current Reactor Dispatcher , passing the given . |
EventBus |
send(Object key,
Event<?> ev)
|
EventBus |
send(Object key,
Event<?> ev,
Bus replyTo)
|
EventBus |
send(Object key,
Supplier<? extends Event<?>> supplier)
|
EventBus |
send(Object key,
Supplier<? extends Event<?>> supplier,
Bus replyTo)
|
<T extends Event<?>> |
sendAndReceive(Object key,
Event<?> event,
Consumer<T> reply)
|
<T extends Event<?>> |
sendAndReceive(Object key,
Supplier<? extends Event<?>> supplier,
Consumer<T> reply)
|
public EventBus(@Nullable Dispatcher dispatcher)
Dispatcher
. The reactor will use a default Router
that broadcast events to all of the registered consumers that match
the notification key and does not perform any type conversion.dispatcher
- The Dispatcher
to use. May be null
in which case a new SynchronousDispatcher
is usedpublic EventBus(@Nullable Dispatcher dispatcher, @Nullable Router router)
Dispatcher
. The reactor will use a default CachingRegistry
.dispatcher
- The Dispatcher
to use. May be null
in which case a new synchronous dispatcher
is used.router
- The Router
used to route events to Consumers
. May be null
in
which case the
default event router that broadcasts events to all of the registered consumers that match
the notification key and does not perform any type conversion
will be used.public EventBus(@Nullable Dispatcher dispatcher, @Nullable Router router, @Nullable Consumer<Throwable> dispatchErrorHandler, @Nullable Consumer<Throwable> uncaughtErrorHandler)
public EventBus(@Nonnull Registry<Object,Consumer<? extends Event<?>>> consumerRegistry, @Nullable Dispatcher dispatcher, @Nullable Router router, @Nullable Consumer<Throwable> dispatchErrorHandler, @Nullable Consumer<Throwable> uncaughtErrorHandler)
dispatacher
and eventRouter
.dispatcher
- The Dispatcher
to use. May be null
in which case a new synchronous
dispatcher is used.router
- The Router
used to route events to Consumers
. May be null
in which case the
default event router that broadcasts events to all of the registered consumers that match
the notification key and does not perform any type
conversion will be used.consumerRegistry
- The Registry
to be used to match Selector
and dispatch to Consumer
.public static EventBusSpec config()
EventBusSpec
to configure a Reactor.public static EventBus create(Environment env)
EventBus
using the given Environment
.env
- The Environment
to use.EventBus
public static EventBus create(Dispatcher dispatcher)
EventBus
using the given Dispatcher
.dispatcher
- The name of the Dispatcher
to use.EventBus
public static EventBus create(Environment env, String dispatcher)
EventBus
using the given Environment
and dispatcher name.env
- The Environment
to use.dispatcher
- The name of the Dispatcher
to use.EventBus
public static EventBus create(Environment env, Dispatcher dispatcher)
env
- The Environment
to use.dispatcher
- The Dispatcher
to use.EventBus
public UUID getId()
UUID
of this Reactor.UUID
of this Reactor.public Registry<Object,Consumer<? extends Event<?>>> getConsumerRegistry()
Registry
in use.public Dispatcher getDispatcher()
Dispatcher
currently in use.Dispatcher
.public boolean respondsToKey(Object key)
Bus
respondsToKey
in interface Bus<Event<?>>
key
- The key to be matched by Selectors
public <T extends Event<?>> Registration<Object,Consumer<? extends Event<?>>> on(Selector selector, Consumer<T> consumer)
Bus
on
in interface Bus<Event<?>>
selector
- The Selector to be used for matchingconsumer
- The Consumer to be triggeredRegistration
object that allows the caller to interact with the given mappingpublic org.reactivestreams.Publisher<? extends Event<?>> on(Selector broadcastSelector)
broadcastSelector
- the Selector
/Object tuple to listen toPublisher
public EventBus notify(Object key, Event<?> ev)
Bus
Event
is ready to be processed.public final EventBus notify(@Nonnull org.reactivestreams.Publisher<?> source, @Nonnull Object key)
Stream
into the given Bus
, notifying with the given key.key
- the key to notify onsource
- the Publisher
to consumepublic final <T> EventBus notify(@Nonnull org.reactivestreams.Publisher<? extends T> source, @Nonnull Function<? super T,?> keyMapper)
Stream
into the given Bus
, notifying with the given key.source
- the Publisher
to consumekeyMapper
- the key function mapping each incoming data to a key to notify onpublic <T extends Event<?>,V> Registration<?,Consumer<? extends Event<?>>> receive(Selector sel, Function<T,V> fn)
sel
- The Selector
to be used for matchingfn
- The transformative Function
to call to receive an Event
Registration
object that allows the caller to interact with the given mappingpublic EventBus notify(Object key, Supplier<? extends Event<?>> supplier)
Supplier
can provide an event that's ready to be
processed.public EventBus notify(Object key)
Selector
that matches the key
should be
triggered with a null input argument.key
- The key to be matched by Selectors
public EventBus send(Object key, Event<?> ev)
Event
and register an internal Consumer
that will take the
output of a previously-registered Function
and respond using the key set on the Event
's replyTo property.key
- The key to be matched by Selectors
ev
- The Eventpublic <T extends Event<?>> EventBus sendAndReceive(Object key, Event<?> event, Consumer<T> reply)
Consumer
on an anonymous Selector
and
set the given event's replyTo
property to the corresponding anonymous key, then register the consumer to
receive replies from the Function
assigned to handle the given key.key
- The key to be matched by Selectors
event
- The event to notify.reply
- The consumer to register as a reply handler.public <T extends Event<?>> EventBus sendAndReceive(Object key, Supplier<? extends Event<?>> supplier, Consumer<T> reply)
Consumer
on an anonymous Selector
and
set the event's replyTo
property to the corresponding anonymous key, then register the consumer to receive
replies from the Function
assigned to handle the given key.key
- The key to be matched by Selectors
supplier
- The supplier to supply the event.reply
- The consumer to register as a reply handler.public <T> Consumer<Event<T>> prepare(Object key)
public <T> void schedule(Consumer<T> consumer, T data)
Consumer
to be executed on the current Reactor Dispatcher
, passing the given .T
- The type of the data.consumer
- The Consumer
to invoke.data
- The data to pass to the consumer.Copyright © 2016. All rights reserved.