E
- Type of dispatched signalpublic final class RingBufferProcessor<E> extends ExecutorPoweredProcessor<E,E>
The processor respects the Reactive Streams contract and must not be signalled concurrently on any onXXXX method. Each subscriber will be assigned a unique thread that will only stop on terminal event: Complete, Error or Cancel. If Auto-Cancel is enabled, when all subscribers are unregistered, a cancel signal is sent to the upstream Publisher if any. Executor can be customized and will define how many concurrent subscribers are allowed (fixed thread). When a Subscriber requests Long.MAX, there won't be any backpressure applied and the producer will run at risk of being throttled if the subscribers don't catch up. With any other strictly positive demand, a subscriber will stop reading new Next signals (Complete and Error will still be read) as soon as the demand has been fully consumed by the publisher.
When more than 1 subscriber listens to that processor, they will all receive the exact same events if their respective demand is still strictly positive, very much like a Fan-Out scenario.
When the backlog has been completely booked and no subscribers is draining the signals, the publisher will start throttling. In effect the smaller the backlog size is defined, the smaller the difference in processing rate between subscribers must remain. Since the sequence for each subscriber will point to various ringBuffer locations, the processor knows when a backlog can't override the previously occupied slot.
executor
autoCancel, SMALL_BUFFER_SIZE, SUBSCRIBER_COUNT, upstreamSubscription
Modifier and Type | Method and Description |
---|---|
static <E> RingBufferProcessor<E> |
create()
Create a new RingBufferProcessor using
ReactorProcessor.SMALL_BUFFER_SIZE backlog size, blockingWait Strategy
and auto-cancel. |
static <E> RingBufferProcessor<E> |
create(boolean autoCancel)
Create a new RingBufferProcessor using
ReactorProcessor.SMALL_BUFFER_SIZE backlog size, blockingWait Strategy
and the passed auto-cancel setting. |
static <E> RingBufferProcessor<E> |
create(ExecutorService service)
Create a new RingBufferProcessor using
ReactorProcessor.SMALL_BUFFER_SIZE backlog size, blockingWait Strategy
and auto-cancel. |
static <E> RingBufferProcessor<E> |
create(ExecutorService service,
boolean autoCancel)
Create a new RingBufferProcessor using
ReactorProcessor.SMALL_BUFFER_SIZE backlog size, blockingWait Strategy
and the passed auto-cancel setting. |
static <E> RingBufferProcessor<E> |
create(ExecutorService service,
int bufferSize)
Create a new RingBufferProcessor using passed backlog size, blockingWait Strategy
and will auto-cancel.
|
static <E> RingBufferProcessor<E> |
create(ExecutorService service,
int bufferSize,
boolean autoCancel)
Create a new RingBufferProcessor using passed backlog size, blockingWait Strategy
and the auto-cancel argument.
|
static <E> RingBufferProcessor<E> |
create(ExecutorService service,
int bufferSize,
com.lmax.disruptor.WaitStrategy strategy)
Create a new RingBufferProcessor using passed backlog size, wait strategy
and will auto-cancel.
|
static <E> RingBufferProcessor<E> |
create(ExecutorService service,
int bufferSize,
com.lmax.disruptor.WaitStrategy strategy,
boolean autoCancel)
Create a new RingBufferProcessor using passed backlog size, wait strategy
and auto-cancel settings.
|
static <E> RingBufferProcessor<E> |
create(String name,
int bufferSize)
Create a new RingBufferProcessor using
ReactorProcessor.SMALL_BUFFER_SIZE backlog size, blockingWait Strategy
and the passed auto-cancel setting. |
static <E> RingBufferProcessor<E> |
create(String name,
int bufferSize,
boolean autoCancel)
Create a new RingBufferProcessor using the blockingWait Strategy, passed backlog size,
and auto-cancel settings.
|
static <E> RingBufferProcessor<E> |
create(String name,
int bufferSize,
com.lmax.disruptor.WaitStrategy strategy)
Create a new RingBufferProcessor using passed backlog size, wait strategy
and will auto-cancel.
|
static <E> RingBufferProcessor<E> |
create(String name,
int bufferSize,
com.lmax.disruptor.WaitStrategy strategy,
boolean autoCancel)
Create a new RingBufferProcessor using passed backlog size, wait strategy
and auto-cancel settings.
|
long |
getAvailableCapacity() |
long |
getCapacity()
Return defined element capacity, used to drive new
Subscription
request needs. |
void |
onComplete() |
void |
onError(Throwable t) |
void |
onNext(E o) |
static <E> RingBufferProcessor<E> |
share()
Create a new RingBufferProcessor using
ReactorProcessor.SMALL_BUFFER_SIZE backlog size, blockingWait Strategy
and auto-cancel. |
static <E> RingBufferProcessor<E> |
share(boolean autoCancel)
Create a new RingBufferProcessor using
ReactorProcessor.SMALL_BUFFER_SIZE backlog size, blockingWait Strategy
and the passed auto-cancel setting. |
static <E> RingBufferProcessor<E> |
share(ExecutorService service)
Create a new RingBufferProcessor using
ReactorProcessor.SMALL_BUFFER_SIZE backlog size, blockingWait Strategy
and auto-cancel. |
static <E> RingBufferProcessor<E> |
share(ExecutorService service,
boolean autoCancel)
Create a new RingBufferProcessor using
ReactorProcessor.SMALL_BUFFER_SIZE backlog size, blockingWait Strategy
and the passed auto-cancel setting. |
static <E> RingBufferProcessor<E> |
share(ExecutorService service,
int bufferSize)
Create a new RingBufferProcessor using passed backlog size, blockingWait Strategy
and will auto-cancel.
|
static <E> RingBufferProcessor<E> |
share(ExecutorService service,
int bufferSize,
boolean autoCancel)
Create a new RingBufferProcessor using passed backlog size, blockingWait Strategy
and the auto-cancel argument.
|
static <E> RingBufferProcessor<E> |
share(ExecutorService service,
int bufferSize,
com.lmax.disruptor.WaitStrategy strategy)
Create a new RingBufferProcessor using passed backlog size, wait strategy
and will auto-cancel.
|
static <E> RingBufferProcessor<E> |
share(ExecutorService service,
int bufferSize,
com.lmax.disruptor.WaitStrategy strategy,
boolean autoCancel)
Create a new RingBufferProcessor using passed backlog size, wait strategy
and auto-cancel settings.
|
static <E> RingBufferProcessor<E> |
share(String name,
int bufferSize)
Create a new RingBufferProcessor using
ReactorProcessor.SMALL_BUFFER_SIZE backlog size, blockingWait Strategy
and the passed auto-cancel setting. |
static <E> RingBufferProcessor<E> |
share(String name,
int bufferSize,
boolean autoCancel)
Create a new RingBufferProcessor using the blockingWait Strategy, passed backlog size,
and auto-cancel settings.
|
static <E> RingBufferProcessor<E> |
share(String name,
int bufferSize,
com.lmax.disruptor.WaitStrategy strategy)
Create a new RingBufferProcessor using passed backlog size, wait strategy
and will auto-cancel.
|
static <E> RingBufferProcessor<E> |
share(String name,
int bufferSize,
com.lmax.disruptor.WaitStrategy strategy,
boolean autoCancel)
Create a new RingBufferProcessor using passed backlog size, wait strategy
and auto-cancel settings.
|
void |
subscribe(org.reactivestreams.Subscriber<? super E> subscriber) |
String |
toString() |
org.reactivestreams.Publisher<Void> |
writeWith(org.reactivestreams.Publisher<? extends E> source) |
alive, awaitAndShutdown, awaitAndShutdown, forceShutdown, shutdown
accept, decrementSubscribers, incrementSubscribers, isReactivePull, onSubscribe
public static <E> RingBufferProcessor<E> create()
ReactorProcessor.SMALL_BUFFER_SIZE
backlog size, blockingWait Strategy
and auto-cancel.
A new Cached ThreadExecutorPool will be implicitely created.
E
- Type of processed signalspublic static <E> RingBufferProcessor<E> create(boolean autoCancel)
ReactorProcessor.SMALL_BUFFER_SIZE
backlog size, blockingWait Strategy
and the passed auto-cancel setting.
A new Cached ThreadExecutorPool will be implicitely created.
E
- Type of processed signalsautoCancel
- Should this propagate cancellation when unregistered by all subscribers ?public static <E> RingBufferProcessor<E> create(ExecutorService service)
ReactorProcessor.SMALL_BUFFER_SIZE
backlog size, blockingWait Strategy
and auto-cancel.
The passed ExecutorService
will execute as many event-loop
consuming the ringbuffer as subscribers.
E
- Type of processed signalsservice
- A provided ExecutorService to manage threading infrastructurepublic static <E> RingBufferProcessor<E> create(ExecutorService service, boolean autoCancel)
ReactorProcessor.SMALL_BUFFER_SIZE
backlog size, blockingWait Strategy
and the passed auto-cancel setting.
The passed ExecutorService
will execute as many event-loop
consuming the ringbuffer as subscribers.
E
- Type of processed signalsservice
- A provided ExecutorService to manage threading infrastructureautoCancel
- Should this propagate cancellation when unregistered by all subscribers ?public static <E> RingBufferProcessor<E> create(String name, int bufferSize)
ReactorProcessor.SMALL_BUFFER_SIZE
backlog size, blockingWait Strategy
and the passed auto-cancel setting.
A new Cached ThreadExecutorPool will be implicitely created and will use the passed name to qualify the created threads.
E
- Type of processed signalsname
- Use a new Cached ExecutorService and assign this name to the created threadsbufferSize
- A Backlog Size to mitigate slow subscriberspublic static <E> RingBufferProcessor<E> create(String name, int bufferSize, boolean autoCancel)
The passed ExecutorService
will execute as many event-loop
consuming the ringbuffer as subscribers.
E
- Type of processed signalsname
- Use a new Cached ExecutorService and assign this name to the created threadsbufferSize
- A Backlog Size to mitigate slow subscribersautoCancel
- Should this propagate cancellation when unregistered by all subscribers ?public static <E> RingBufferProcessor<E> create(ExecutorService service, int bufferSize)
The passed ExecutorService
will execute as many event-loop
consuming the ringbuffer as subscribers.
E
- Type of processed signalsservice
- A provided ExecutorService to manage threading infrastructurebufferSize
- A Backlog Size to mitigate slow subscriberspublic static <E> RingBufferProcessor<E> create(ExecutorService service, int bufferSize, boolean autoCancel)
The passed ExecutorService
will execute as many event-loop
consuming the ringbuffer as subscribers.
E
- Type of processed signalsservice
- A provided ExecutorService to manage threading infrastructurebufferSize
- A Backlog Size to mitigate slow subscribersautoCancel
- Should this propagate cancellation when unregistered by all subscribers ?public static <E> RingBufferProcessor<E> create(String name, int bufferSize, com.lmax.disruptor.WaitStrategy strategy)
A new Cached ThreadExecutorPool will be implicitely created and will use the passed name to qualify the created threads.
E
- Type of processed signalsname
- Use a new Cached ExecutorService and assign this name to the created threadsbufferSize
- A Backlog Size to mitigate slow subscribersstrategy
- A RingBuffer WaitStrategy to use instead of the default BlockingWaitStrategy.public static <E> RingBufferProcessor<E> create(String name, int bufferSize, com.lmax.disruptor.WaitStrategy strategy, boolean autoCancel)
A new Cached ThreadExecutorPool will be implicitely created and will use the passed name to qualify the created threads.
E
- Type of processed signalsname
- Use a new Cached ExecutorService and assign this name to the created threadsbufferSize
- A Backlog Size to mitigate slow subscribersstrategy
- A RingBuffer WaitStrategy to use instead of the default BlockingWaitStrategy.autoCancel
- Should this propagate cancellation when unregistered by all subscribers ?public static <E> RingBufferProcessor<E> create(ExecutorService service, int bufferSize, com.lmax.disruptor.WaitStrategy strategy)
The passed ExecutorService
will execute as many event-loop
consuming the ringbuffer as subscribers.
E
- Type of processed signalsservice
- A provided ExecutorService to manage threading infrastructurebufferSize
- A Backlog Size to mitigate slow subscribersstrategy
- A RingBuffer WaitStrategy to use instead of the default BlockingWaitStrategy.public static <E> RingBufferProcessor<E> create(ExecutorService service, int bufferSize, com.lmax.disruptor.WaitStrategy strategy, boolean autoCancel)
The passed ExecutorService
will execute as many event-loop
consuming the ringbuffer as subscribers.
E
- Type of processed signalsservice
- A provided ExecutorService to manage threading infrastructurebufferSize
- A Backlog Size to mitigate slow subscribersstrategy
- A RingBuffer WaitStrategy to use instead of the default BlockingWaitStrategy.autoCancel
- Should this propagate cancellation when unregistered by all subscribers ?public static <E> RingBufferProcessor<E> share()
ReactorProcessor.SMALL_BUFFER_SIZE
backlog size, blockingWait Strategy
and auto-cancel.
A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that will fan-in data.
A new Cached ThreadExecutorPool will be implicitely created.
E
- Type of processed signalspublic static <E> RingBufferProcessor<E> share(boolean autoCancel)
ReactorProcessor.SMALL_BUFFER_SIZE
backlog size, blockingWait Strategy
and the passed auto-cancel setting.
A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that will fan-in data.
A new Cached ThreadExecutorPool will be implicitely created.
E
- Type of processed signalsautoCancel
- Should this propagate cancellation when unregistered by all subscribers ?public static <E> RingBufferProcessor<E> share(ExecutorService service)
ReactorProcessor.SMALL_BUFFER_SIZE
backlog size, blockingWait Strategy
and auto-cancel.
A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that will fan-in data.
The passed ExecutorService
will execute as many event-loop
consuming the ringbuffer as subscribers.
E
- Type of processed signalsservice
- A provided ExecutorService to manage threading infrastructurepublic static <E> RingBufferProcessor<E> share(ExecutorService service, boolean autoCancel)
ReactorProcessor.SMALL_BUFFER_SIZE
backlog size, blockingWait Strategy
and the passed auto-cancel setting.
A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that will fan-in data.
The passed ExecutorService
will execute as many event-loop
consuming the ringbuffer as subscribers.
E
- Type of processed signalsservice
- A provided ExecutorService to manage threading infrastructureautoCancel
- Should this propagate cancellation when unregistered by all subscribers ?public static <E> RingBufferProcessor<E> share(String name, int bufferSize)
ReactorProcessor.SMALL_BUFFER_SIZE
backlog size, blockingWait Strategy
and the passed auto-cancel setting.
A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that will fan-in data.
A new Cached ThreadExecutorPool will be implicitely created and will use the passed name to qualify the created threads.
E
- Type of processed signalsname
- Use a new Cached ExecutorService and assign this name to the created threadsbufferSize
- A Backlog Size to mitigate slow subscriberspublic static <E> RingBufferProcessor<E> share(String name, int bufferSize, boolean autoCancel)
A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that will fan-in data.
The passed ExecutorService
will execute as many event-loop
consuming the ringbuffer as subscribers.
E
- Type of processed signalsname
- Use a new Cached ExecutorService and assign this name to the created threadsbufferSize
- A Backlog Size to mitigate slow subscribersautoCancel
- Should this propagate cancellation when unregistered by all subscribers ?public static <E> RingBufferProcessor<E> share(ExecutorService service, int bufferSize)
A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that will fan-in data.
The passed ExecutorService
will execute as many event-loop
consuming the ringbuffer as subscribers.
E
- Type of processed signalsservice
- A provided ExecutorService to manage threading infrastructurebufferSize
- A Backlog Size to mitigate slow subscriberspublic static <E> RingBufferProcessor<E> share(ExecutorService service, int bufferSize, boolean autoCancel)
A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that will fan-in data.
The passed ExecutorService
will execute as many event-loop
consuming the ringbuffer as subscribers.
E
- Type of processed signalsservice
- A provided ExecutorService to manage threading infrastructurebufferSize
- A Backlog Size to mitigate slow subscribersautoCancel
- Should this propagate cancellation when unregistered by all subscribers ?public static <E> RingBufferProcessor<E> share(String name, int bufferSize, com.lmax.disruptor.WaitStrategy strategy)
A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that will fan-in data.
A new Cached ThreadExecutorPool will be implicitely created and will use the passed name to qualify the created threads.
E
- Type of processed signalsname
- Use a new Cached ExecutorService and assign this name to the created threadsbufferSize
- A Backlog Size to mitigate slow subscribersstrategy
- A RingBuffer WaitStrategy to use instead of the default BlockingWaitStrategy.public static <E> RingBufferProcessor<E> share(String name, int bufferSize, com.lmax.disruptor.WaitStrategy strategy, boolean autoCancel)
A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that will fan-in data.
A new Cached ThreadExecutorPool will be implicitely created and will use the passed name to qualify the created threads.
E
- Type of processed signalsname
- Use a new Cached ExecutorService and assign this name to the created threadsbufferSize
- A Backlog Size to mitigate slow subscribersstrategy
- A RingBuffer WaitStrategy to use instead of the default BlockingWaitStrategy.autoCancel
- Should this propagate cancellation when unregistered by all subscribers ?public static <E> RingBufferProcessor<E> share(ExecutorService service, int bufferSize, com.lmax.disruptor.WaitStrategy strategy)
A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that will fan-in data.
The passed ExecutorService
will execute as many event-loop
consuming the ringbuffer as subscribers.
E
- Type of processed signalsservice
- A provided ExecutorService to manage threading infrastructurebufferSize
- A Backlog Size to mitigate slow subscribersstrategy
- A RingBuffer WaitStrategy to use instead of the default BlockingWaitStrategy.public static <E> RingBufferProcessor<E> share(ExecutorService service, int bufferSize, com.lmax.disruptor.WaitStrategy strategy, boolean autoCancel)
A Shared Processor authorizes concurrent onNext calls and is suited for multi-threaded publisher that will fan-in data.
The passed ExecutorService
will execute as many event-loop
consuming the ringbuffer as subscribers.
E
- Type of processed signalsservice
- A provided ExecutorService to manage threading infrastructurebufferSize
- A Backlog Size to mitigate slow subscribersstrategy
- A RingBuffer WaitStrategy to use instead of the default BlockingWaitStrategy.autoCancel
- Should this propagate cancellation when unregistered by all subscribers ?public void subscribe(org.reactivestreams.Subscriber<? super E> subscriber)
public void onNext(E o)
public void onError(Throwable t)
public void onComplete()
onComplete
in interface org.reactivestreams.Subscriber<E>
onComplete
in class ExecutorPoweredProcessor<E,E>
public org.reactivestreams.Publisher<Void> writeWith(org.reactivestreams.Publisher<? extends E> source)
public long getAvailableCapacity()
getAvailableCapacity
in class ReactorProcessor<E,E>
public long getCapacity()
NonBlocking
Subscription
request needs. This is the maximum in-flight data allowed to transit to this elements.getCapacity
in interface NonBlocking
getCapacity
in class ReactorProcessor<E,E>
Copyright © 2016. All rights reserved.