Class ReplayingDecoder<T extends Enum<T>>
- Type Parameters:
T- the state type; useVoidEnumif state management is unused
- All Implemented Interfaces:
ChannelHandler,ChannelUpstreamHandler,LifeCycleAwareChannelHandler
- Direct Known Subclasses:
CompatibleMarshallingDecoder,HttpMessageDecoder,SocksAuthRequestDecoder,SocksAuthResponseDecoder,SocksCmdRequestDecoder,SocksCmdResponseDecoder,SocksInitRequestDecoder,SocksInitResponseDecoder,WebSocket00FrameDecoder,WebSocket08FrameDecoder
FrameDecoder which enables implementation
of a non-blocking decoder in the blocking I/O paradigm.
The biggest difference between ReplayingDecoder and
FrameDecoder is that ReplayingDecoder allows you to
implement the decode() and decodeLast() methods just like
all required bytes were received already, rather than checking the
availability of the required bytes. For example, the following
FrameDecoder implementation:
public class IntegerHeaderFrameDecoder extendsis simplified like the following withFrameDecoder{@Overrideprotected Object decode(ChannelHandlerContextctx,Channelchannel,ChannelBufferbuf) throws Exception { if (buf.readableBytes() < 4) { return null; } buf.markReaderIndex(); int length = buf.readInt(); if (buf.readableBytes() < length) { buf.resetReaderIndex(); return null; } return buf.readBytes(length); } }
ReplayingDecoder:
public class IntegerHeaderFrameDecoder
extends ReplayingDecoder<VoidEnum> {
protected Object decode(ChannelHandlerContext ctx,
Channel channel,
ChannelBuffer buf,
VoidEnum state) throws Exception {
return buf.readBytes(buf.readInt());
}
}
How does this work?
ReplayingDecoder passes a specialized ChannelBuffer
implementation which throws an Error of certain type when there's not
enough data in the buffer. In the IntegerHeaderFrameDecoder above,
you just assumed that there will be 4 or more bytes in the buffer when
you call buf.readInt(). If there's really 4 bytes in the buffer,
it will return the integer header as you expected. Otherwise, the
Error will be raised and the control will be returned to
ReplayingDecoder. If ReplayingDecoder catches the
Error, then it will rewind the readerIndex of the buffer
back to the 'initial' position (i.e. the beginning of the buffer) and call
the decode(..) method again when more data is received into the
buffer.
Please note that ReplayingDecoder always throws the same cached
Error instance to avoid the overhead of creating a new Error
and filling its stack trace for every throw.
Limitations
At the cost of the simplicity, ReplayingDecoder enforces you a few
limitations:
- Some buffer operations are prohibited.
- Performance can be worse if the network is slow and the message format is complicated unlike the example above. In this case, your decoder might have to decode the same part of the message over and over again.
- You must keep in mind that
decode(..)method can be called many times to decode a single message. For example, the following code will not work:public class MyDecoder extends
The correct implementation looks like the following, and you can also utilize the 'checkpoint' feature which is explained in detail in the next section.ReplayingDecoder<VoidEnum> { private final Queue<Integer> values = new LinkedList<Integer>();@Overridepublic Object decode(..,ChannelBufferbuffer, ..) throws Exception { // A message contains 2 integers. values.offer(buffer.readInt()); values.offer(buffer.readInt()); // This assertion will fail intermittently since values.offer() // can be called more than two times! assert values.size() == 2; return values.poll() + values.poll(); } }public class MyDecoder extends
ReplayingDecoder<VoidEnum> { private final Queue<Integer> values = new LinkedList<Integer>();@Overridepublic Object decode(..,ChannelBufferbuffer, ..) throws Exception { // Revert the state of the variable that might have been changed // since the last partial decode. values.clear(); // A message contains 2 integers. values.offer(buffer.readInt()); values.offer(buffer.readInt()); // Now we know this assertion will never fail. assert values.size() == 2; return values.poll() + values.poll(); } }
Improving the performance
Fortunately, the performance of a complex decoder implementation can be
improved significantly with the checkpoint() method. The
checkpoint() method updates the 'initial' position of the buffer so
that ReplayingDecoder rewinds the readerIndex of the buffer
to the last position where you called the checkpoint() method.
Calling checkpoint(T) with an Enum
Although you can just use checkpoint() method and manage the state
of the decoder by yourself, the easiest way to manage the state of the
decoder is to create an Enum type which represents the current state
of the decoder and to call checkpoint(T) method whenever the state
changes. You can have as many states as you want depending on the
complexity of the message you want to decode:
public enum MyDecoderState {
READ_LENGTH,
READ_CONTENT;
}
public class IntegerHeaderFrameDecoder
extends ReplayingDecoder<MyDecoderState> {
private int length;
public IntegerHeaderFrameDecoder() {
// Set the initial state.
super(MyDecoderState.READ_LENGTH);
}
@Override
protected Object decode(ChannelHandlerContext ctx,
Channel channel,
ChannelBuffer buf,
MyDecoderState state) throws Exception {
switch (state) {
case READ_LENGTH:
length = buf.readInt();
checkpoint(MyDecoderState.READ_CONTENT);
case READ_CONTENT:
ChannelBuffer frame = buf.readBytes(length);
checkpoint(MyDecoderState.READ_LENGTH);
return frame;
default:
throw new Error("Shouldn't reach here.");
}
}
}
Calling checkpoint() with no parameter
An alternative way to manage the decoder state is to manage it by yourself.
public class IntegerHeaderFrameDecoder
extends ReplayingDecoder<VoidEnum> {
private boolean readLength;
private int length;
@Override
protected Object decode(ChannelHandlerContext ctx,
Channel channel,
ChannelBuffer buf,
VoidEnum state) throws Exception {
if (!readLength) {
length = buf.readInt();
readLength = true;
checkpoint();
}
if (readLength) {
ChannelBuffer frame = buf.readBytes(length);
readLength = false;
checkpoint();
return frame;
}
}
}
Replacing a decoder with another decoder in a pipeline
If you are going to write a protocol multiplexer, you will probably want to
replace a ReplayingDecoder (protocol detector) with another
ReplayingDecoder or FrameDecoder (actual protocol decoder).
It is not possible to achieve this simply by calling
ChannelPipeline.replace(ChannelHandler, String, ChannelHandler), but
some additional steps are required:
public class FirstDecoder extendsReplayingDecoder<VoidEnum> { public FirstDecoder() { super(true); // Enable unfold }@Overrideprotected Object decode(ChannelHandlerContextctx,Channelch,ChannelBufferbuf,VoidEnumstate) { ... // Decode the first message Object firstMessage = ...; // Add the second decoder ctx.getPipeline().addLast("second", new SecondDecoder()); // Remove the first decoder (me) ctx.getPipeline().remove(this); if (buf.readable()) { // Hand off the remaining data to the second decoder return new Object[] { firstMessage, buf.readBytes(super.actualReadableBytes()) }; } else { // Nothing to hand off return firstMessage; } }
-
Nested Class Summary
Nested classes/interfaces inherited from interface org.jboss.netty.channel.ChannelHandler
ChannelHandler.Sharable -
Field Summary
FieldsModifier and TypeFieldDescriptionprivate intprivate booleanprivate final ReplayingDecoderBufferprivate TFields inherited from class org.jboss.netty.handler.codec.frame.FrameDecoder
cumulation, DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS -
Constructor Summary
ConstructorsModifierConstructorDescriptionprotectedCreates a new instance with no initial state (i.e:null).protectedReplayingDecoder(boolean unfold) protectedReplayingDecoder(T initialState) Creates a new instance with the specified initial state.protectedReplayingDecoder(T initialState, boolean unfold) -
Method Summary
Modifier and TypeMethodDescriptionprivate voidcallDecode(ChannelHandlerContext context, Channel channel, ChannelBuffer input, ChannelBuffer replayableInput, SocketAddress remoteAddress) protected voidStores the internal cumulative buffer's reader position.protected voidcheckpoint(T state) Stores the internal cumulative buffer's reader position and updates the current decoder state.protected voidprotected final Objectdecode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) protected abstract Objectdecode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, T state) Decodes the received packets so far into a frame.protected final ObjectdecodeLast(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) Decodes the received data so far into a frame when the channel is disconnected.protected ObjectdecodeLast(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, T state) Decodes the received data so far into a frame when the channel is disconnected.protected TgetState()Returns the current state of this decoder.protected ChannelBufferReturns the internal cumulative buffer of this decoder.voidInvoked when a message object (e.g:ChannelBuffer) was received from a remote peer.protected TSets the current state of this decoder.Methods inherited from class org.jboss.netty.handler.codec.frame.FrameDecoder
actualReadableBytes, afterAdd, afterRemove, appendToCumulation, beforeAdd, beforeRemove, channelClosed, channelDisconnected, exceptionCaught, extractFrame, getMaxCumulationBufferCapacity, getMaxCumulationBufferComponents, isUnfold, newCumulationBuffer, replace, setMaxCumulationBufferCapacity, setMaxCumulationBufferComponents, setUnfold, unfoldAndFireMessageReceived, updateCumulationMethods inherited from class org.jboss.netty.channel.SimpleChannelUpstreamHandler
channelBound, channelConnected, channelInterestChanged, channelOpen, channelUnbound, childChannelClosed, childChannelOpen, handleUpstream, writeComplete
-
Field Details
-
replayable
-
state
-
checkpoint
private int checkpoint -
needsCleanup
private boolean needsCleanup
-
-
Constructor Details
-
ReplayingDecoder
protected ReplayingDecoder()Creates a new instance with no initial state (i.e:null). -
ReplayingDecoder
protected ReplayingDecoder(boolean unfold) -
ReplayingDecoder
Creates a new instance with the specified initial state. -
ReplayingDecoder
-
-
Method Details
-
internalBuffer
Description copied from class:FrameDecoderReturns the internal cumulative buffer of this decoder. You usually do not need to access the internal buffer directly to write a decoder. Use it only when you must use it at your own risk.- Overrides:
internalBufferin classFrameDecoder
-
checkpoint
protected void checkpoint()Stores the internal cumulative buffer's reader position. -
checkpoint
Stores the internal cumulative buffer's reader position and updates the current decoder state. -
getState
Returns the current state of this decoder.- Returns:
- the current state of this decoder
-
setState
Sets the current state of this decoder.- Returns:
- the old state of this decoder
-
decode
protected abstract Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, T state) throws Exception Decodes the received packets so far into a frame.- Parameters:
ctx- the context of this handlerchannel- the current channelbuffer- the cumulative buffer of received packets so far. Note that the buffer might be empty, which means you should not make an assumption that the buffer contains at least one byte in your decoder implementation.state- the current decoder state (nullif unused)- Returns:
- the decoded frame
- Throws:
Exception
-
decodeLast
protected Object decodeLast(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, T state) throws Exception Decodes the received data so far into a frame when the channel is disconnected.- Parameters:
ctx- the context of this handlerchannel- the current channelbuffer- the cumulative buffer of received packets so far. Note that the buffer might be empty, which means you should not make an assumption that the buffer contains at least one byte in your decoder implementation.state- the current decoder state (nullif unused)- Returns:
- the decoded frame
- Throws:
Exception
-
decode
protected final Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception Callsdecode(ChannelHandlerContext, Channel, ChannelBuffer, Enum). This method should be never used byReplayingDecoderitself. But to be safe we should handle it anyway- Specified by:
decodein classFrameDecoder- Parameters:
ctx- the context of this handlerchannel- the current channelbuffer- the cumulative buffer of received packets so far. Note that the buffer might be empty, which means you should not make an assumption that the buffer contains at least one byte in your decoder implementation.- Returns:
- the decoded frame if a full frame was received and decoded.
nullif there's not enough data in the buffer to decode a frame. - Throws:
Exception
-
decodeLast
protected final Object decodeLast(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception Description copied from class:FrameDecoderDecodes the received data so far into a frame when the channel is disconnected.- Overrides:
decodeLastin classFrameDecoder- Parameters:
ctx- the context of this handlerchannel- the current channelbuffer- the cumulative buffer of received packets so far. Note that the buffer might be empty, which means you should not make an assumption that the buffer contains at least one byte in your decoder implementation.- Returns:
- the decoded frame if a full frame was received and decoded.
nullif there's not enough data in the buffer to decode a frame. - Throws:
Exception
-
messageReceived
Description copied from class:SimpleChannelUpstreamHandlerInvoked when a message object (e.g:ChannelBuffer) was received from a remote peer.- Overrides:
messageReceivedin classFrameDecoder- Throws:
Exception
-
callDecode
private void callDecode(ChannelHandlerContext context, Channel channel, ChannelBuffer input, ChannelBuffer replayableInput, SocketAddress remoteAddress) throws Exception - Throws:
Exception
-
cleanup
Description copied from class:FrameDecoderGets called onFrameDecoder.channelDisconnected(ChannelHandlerContext, ChannelStateEvent)andFrameDecoder.channelClosed(ChannelHandlerContext, ChannelStateEvent)- Overrides:
cleanupin classFrameDecoder- Throws:
Exception
-