Package org.jboss.netty.handler.queue
Class BufferedWriteHandler
java.lang.Object
org.jboss.netty.channel.SimpleChannelHandler
org.jboss.netty.handler.queue.BufferedWriteHandler
- All Implemented Interfaces:
ChannelDownstreamHandler,ChannelHandler,ChannelUpstreamHandler,LifeCycleAwareChannelHandler
public class BufferedWriteHandler
extends SimpleChannelHandler
implements LifeCycleAwareChannelHandler
Emulates buffered write operation. This handler stores all write requests
into an unbounded
Queue and flushes them to the downstream when
flush() method is called.
Here is an example that demonstrates the usage:
BufferedWriteHandler bufferedWriter = new BufferedWriteHandler();
ChannelPipeline p = ...;
p.addFirst("buffer", bufferedWriter);
...
Channel ch = ...;
// msg1, 2, and 3 are stored in the queue of bufferedWriter.
ch.write(msg1);
ch.write(msg2);
ch.write(msg3);
// and will be flushed on request.
bufferedWriter.flush();
Auto-flush
The write request queue is automatically flushed when the associatedChannel is disconnected or closed. However, it does not flush the
queue otherwise. It means you have to call flush() before the size
of the queue increases too much. You can implement your own auto-flush
strategy by extending this handler:
public class AutoFlusher extendsBufferedWriteHandler{ private final AtomicLong bufferSize = new AtomicLong(); @Override public void writeRequested(ChannelHandlerContextctx,MessageEvente) { super.writeRequested(ctx, e);ChannelBufferdata = (ChannelBuffer) e.getMessage(); int newBufferSize = bufferSize.addAndGet(data.readableBytes()); // Flush the queue if it gets larger than 8KiB. if (newBufferSize > 8192) { flush(); bufferSize.set(0); } } }
Consolidate on flush
If there are two or more write requests in the queue and all their message type isChannelBuffer, they can be merged into a single write request
to save the number of system calls.
BEFORE consolidation: AFTER consolidation: +-------+-------+-------+ +-------------+ | Req C | Req B | Req A |------\\| Request ABC | | "789" | "456" | "123" |------//| "123456789" | +-------+-------+-------+ +-------------+This feature is disabled by default. You can override the default when you create this handler or call
flush(boolean). If you specified
true when you call the constructor, calling flush() will
always consolidate the queue. Otherwise, you have to call
flush(boolean) with true to enable this feature for each
flush.
The disadvantage of consolidation is that the ChannelFuture and its
ChannelFutureListeners associated with the original write requests
might be notified later than when they are actually written out. They will
always be notified when the consolidated write request is fully written.
The following example implements the consolidation strategy that reduces the number of write requests based on the writability of a channel:
public class ConsolidatingAutoFlusher extendsBufferedWriteHandler{ public ConsolidatingAutoFlusher() { // Enable consolidation by default. super(true); } @Override public void channelOpen(ChannelHandlerContextctx,ChannelStateEvente) throws Exception {ChannelConfigcfg = e.getChannel().getConfig(); if (cfg instanceofNioSocketChannelConfig) { // Lower the watermark to increase the chance of consolidation. cfg.setWriteBufferLowWaterMark(0); } super.channelOpen(e); } @Override public void writeRequested(ChannelHandlerContextctx,MessageEvente) throws Exception { super.writeRequested(ctx, et); if (e.getChannel().isWritable()) { flush(); } } @Override public void channelInterestChanged(ChannelHandlerContextctx,ChannelStateEvente) throws Exception { if (e.getChannel().isWritable()) { flush(); } } }
Prioritized Writes
You can implement prioritized writes by specifying an unbounded priority queue in the constructor of this handler. It will be required to design the proper strategy to determine how oftenflush() should be called.
For example, you could call flush() periodically, using
HashedWheelTimer every second.-
Nested Class Summary
Nested classes/interfaces inherited from interface org.jboss.netty.channel.ChannelHandler
ChannelHandler.Sharable -
Field Summary
FieldsModifier and TypeFieldDescriptionprivate final booleanprivate ChannelHandlerContextprivate final AtomicBooleanprivate final Queue<MessageEvent> -
Constructor Summary
ConstructorsConstructorDescriptionCreates a new instance with the default unboundedBlockingQueueimplementation and without buffer consolidation.BufferedWriteHandler(boolean consolidateOnFlush) Creates a new instance withConcurrentLinkedQueueBufferedWriteHandler(Queue<MessageEvent> queue) Creates a new instance with the specified thread-safe unboundedQueueand without buffer consolidation.BufferedWriteHandler(Queue<MessageEvent> queue, boolean consolidateOnFlush) Creates a new instance with the specified thread-safe unboundedQueue. -
Method Summary
Modifier and TypeMethodDescriptionvoidvoidFail all buffered writes that are left.voidvoidvoidFail all buffered writes that are left.voidInvoked whenChannel.close()was called.private List<MessageEvent> consolidatedWrite(List<MessageEvent> pendingWrites) voidInvoked whenChannel.disconnect()was called.voidflush()Sends the queued write requests to the downstream.voidflush(boolean consolidateOnFlush) Sends the queued write requests to the downstream.protected Queue<MessageEvent> getQueue()Returns the queue which stores the write requests.booleanvoidStores all write requests to the queue so that they are actually written onflush().Methods inherited from class org.jboss.netty.channel.SimpleChannelHandler
bindRequested, channelBound, channelConnected, channelDisconnected, channelInterestChanged, channelOpen, channelUnbound, childChannelClosed, childChannelOpen, connectRequested, exceptionCaught, handleDownstream, handleUpstream, messageReceived, setInterestOpsRequested, unbindRequested, writeComplete
-
Field Details
-
queue
-
consolidateOnFlush
private final boolean consolidateOnFlush -
ctx
-
flush
-
-
Constructor Details
-
BufferedWriteHandler
public BufferedWriteHandler()Creates a new instance with the default unboundedBlockingQueueimplementation and without buffer consolidation. -
BufferedWriteHandler
-
BufferedWriteHandler
public BufferedWriteHandler(boolean consolidateOnFlush) Creates a new instance withConcurrentLinkedQueue- Parameters:
consolidateOnFlush-trueif and only if the buffered write requests are merged into a single write request onflush()
-
BufferedWriteHandler
Creates a new instance with the specified thread-safe unboundedQueue. Please note that specifying a boundedQueueor a thread-unsafeQueuewill result in an unspecified behavior.- Parameters:
consolidateOnFlush-trueif and only if the buffered write requests are merged into a single write request onflush()
-
-
Method Details
-
isConsolidateOnFlush
public boolean isConsolidateOnFlush() -
getQueue
Returns the queue which stores the write requests. The default implementation returns the queue which was specified in the constructor. -
flush
public void flush()Sends the queued write requests to the downstream. -
flush
public void flush(boolean consolidateOnFlush) Sends the queued write requests to the downstream.- Parameters:
consolidateOnFlush-trueif and only if the buffered write requests are merged into a single write request
-
consolidatedWrite
-
writeRequested
Stores all write requests to the queue so that they are actually written onflush().- Overrides:
writeRequestedin classSimpleChannelHandler- Throws:
Exception
-
disconnectRequested
Description copied from class:SimpleChannelHandlerInvoked whenChannel.disconnect()was called.- Overrides:
disconnectRequestedin classSimpleChannelHandler- Throws:
Exception
-
closeRequested
Description copied from class:SimpleChannelHandlerInvoked whenChannel.close()was called.- Overrides:
closeRequestedin classSimpleChannelHandler- Throws:
Exception
-
channelClosed
Fail all buffered writes that are left. See invalid input: '<'a href="https://github.com/netty/netty/issues/308>#308 for more details.- Overrides:
channelClosedin classSimpleChannelHandler- Throws:
Exception
-
beforeAdd
- Specified by:
beforeAddin interfaceLifeCycleAwareChannelHandler- Throws:
Exception
-
afterAdd
- Specified by:
afterAddin interfaceLifeCycleAwareChannelHandler- Throws:
Exception
-
beforeRemove
- Specified by:
beforeRemovein interfaceLifeCycleAwareChannelHandler- Throws:
Exception
-
afterRemove
Fail all buffered writes that are left. See invalid input: '<'a href="https://github.com/netty/netty/issues/308>#308 for more details.- Specified by:
afterRemovein interfaceLifeCycleAwareChannelHandler- Throws:
Exception
-