Class MemoryAwareThreadPoolExecutor
- java.lang.Object
-
- java.util.concurrent.AbstractExecutorService
-
- java.util.concurrent.ThreadPoolExecutor
-
- org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor
-
- All Implemented Interfaces:
java.util.concurrent.Executor,java.util.concurrent.ExecutorService
- Direct Known Subclasses:
FairOrderedMemoryAwareThreadPoolExecutor,OrderedMemoryAwareThreadPoolExecutor
public class MemoryAwareThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutorAThreadPoolExecutorwhich blocks the task submission when there's too many tasks in the queue. Both per-Channeland per-Executorlimitation can be applied.When a task (i.e.
Runnable) is submitted,MemoryAwareThreadPoolExecutorcallsObjectSizeEstimator.estimateSize(Object)to get the estimated size of the task in bytes to calculate the amount of memory occupied by the unprocessed tasks.If the total size of the unprocessed tasks exceeds either per-
Channelor per-Executorthreshold, any furtherexecute(Runnable)call will block until the tasks in the queue are processed so that the total size goes under the threshold.Using an alternative task size estimation strategy
Although the default implementation does its best to guess the size of an object of unknown type, it is always good idea to to use an alternativeObjectSizeEstimatorimplementation instead of theDefaultObjectSizeEstimatorto avoid incorrect task size calculation, especially when:- you are using
MemoryAwareThreadPoolExecutorindependently fromExecutionHandler, - you are submitting a task whose type is not
ChannelEventRunnable, or - the message type of the
MessageEventin theChannelEventRunnableis notChannelBuffer.
ObjectSizeEstimatorwhich understands a user-defined object:public class MyRunnable implements
Runnable{ private final byte[] data; public MyRunnable(byte[] data) { this.data = data; } public void run() { // Process 'data' .. } } public class MyObjectSizeEstimator extendsDefaultObjectSizeEstimator{ @Override public int estimateSize(Object o) { if (o instanceof MyRunnable) { return ((MyRunnable) o).data.length + 8; } return super.estimateSize(o); } }ThreadPoolExecutorpool = newMemoryAwareThreadPoolExecutor( 16, 65536, 1048576, 30,TimeUnit.SECONDS, new MyObjectSizeEstimator(),Executors.defaultThreadFactory()); pool.execute(new MyRunnable(data));Event execution order
Please note that this executor does not maintain the order of theChannelEvents for the sameChannel. For example, you can even receive a"channelClosed"event before a"messageReceived"event, as depicted by the following diagram. For example, the events can be processed as depicted below:--------------------------------> Timeline --------------------------------> Thread X: --- Channel A (Event 1) --- Channel A (Event 2) ---------------------------> Thread Y: --- Channel A (Event 3) --- Channel B (Event 2) --- Channel B (Event 3) ---> Thread Z: --- Channel B (Event 1) --- Channel B (Event 4) --- Channel A (Event 4) --->To maintain the event order, you must useOrderedMemoryAwareThreadPoolExecutor.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description private static classMemoryAwareThreadPoolExecutor.Limiterprivate static classMemoryAwareThreadPoolExecutor.MemoryAwareRunnableprivate static classMemoryAwareThreadPoolExecutor.NewThreadRunsPolicyprivate static classMemoryAwareThreadPoolExecutor.Settings-
Nested classes/interfaces inherited from class java.util.concurrent.ThreadPoolExecutor
java.util.concurrent.ThreadPoolExecutor.AbortPolicy, java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy, java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy, java.util.concurrent.ThreadPoolExecutor.DiscardPolicy
-
-
Field Summary
Fields Modifier and Type Field Description private java.util.concurrent.ConcurrentMap<Channel,java.util.concurrent.atomic.AtomicLong>channelCountersprivate static InternalLoggerloggerprivate static SharedResourceMisuseDetectormisuseDetectorprivate booleannotifyOnShutdownprivate MemoryAwareThreadPoolExecutor.Settingssettingsprivate MemoryAwareThreadPoolExecutor.LimitertotalLimiter
-
Constructor Summary
Constructors Constructor Description MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize)Creates a new instance.MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, java.util.concurrent.TimeUnit unit)Creates a new instance.MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, java.util.concurrent.TimeUnit unit, java.util.concurrent.ThreadFactory threadFactory)Creates a new instance.MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, java.util.concurrent.TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, java.util.concurrent.ThreadFactory threadFactory)Creates a new instance.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected voidbeforeExecute(java.lang.Thread t, java.lang.Runnable r)protected voiddecreaseCounter(java.lang.Runnable task)protected voiddoExecute(java.lang.Runnable task)Put the actual execution logic here.protected voiddoUnorderedExecute(java.lang.Runnable task)Executes the specified task without maintaining the event order.voidexecute(java.lang.Runnable command)private java.util.concurrent.atomic.AtomicLonggetChannelCounter(Channel channel)longgetMaxChannelMemorySize()Returns the maximum total size of the queued events per channel.longgetMaxTotalMemorySize()Returns the maximum total size of the queued events for this pool.booleangetNotifyChannelFuturesOnShutdown()Returns if theChannelFuture's of theChannelEventRunnable's should be notified about the shutdown of thisMemoryAwareThreadPoolExecutor.ObjectSizeEstimatorgetObjectSizeEstimator()Returns theObjectSizeEstimatorof this pool.protected voidincreaseCounter(java.lang.Runnable task)booleanremove(java.lang.Runnable task)voidsetMaxChannelMemorySize(long maxChannelMemorySize)Sets the maximum total size of the queued events per channel.voidsetNotifyChannelFuturesOnShutdown(boolean notifyOnShutdown)If set tofalseno queuedChannelEventRunnable'sChannelFuturewill get notified onceshutdownNow()is called.voidsetObjectSizeEstimator(ObjectSizeEstimator objectSizeEstimator)Sets theObjectSizeEstimatorof this pool.protected booleanshouldCount(java.lang.Runnable task)Returnstrueif and only if the specifiedtaskshould be counted to limit the global and per-channel memory consumption.java.util.List<java.lang.Runnable>shutdownNow()This will callshutdownNow(boolean)with the value ofgetNotifyChannelFuturesOnShutdown().java.util.List<java.lang.Runnable>shutdownNow(boolean notify)SeeThreadPoolExecutor.shutdownNow()for how it handles the shutdown.protected voidterminated()-
Methods inherited from class java.util.concurrent.ThreadPoolExecutor
afterExecute, allowCoreThreadTimeOut, allowsCoreThreadTimeOut, awaitTermination, finalize, getActiveCount, getCompletedTaskCount, getCorePoolSize, getKeepAliveTime, getLargestPoolSize, getMaximumPoolSize, getPoolSize, getQueue, getRejectedExecutionHandler, getTaskCount, getThreadFactory, isShutdown, isTerminated, isTerminating, prestartAllCoreThreads, prestartCoreThread, purge, setCorePoolSize, setKeepAliveTime, setMaximumPoolSize, setRejectedExecutionHandler, setThreadFactory, shutdown, toString
-
-
-
-
Field Detail
-
logger
private static final InternalLogger logger
-
misuseDetector
private static final SharedResourceMisuseDetector misuseDetector
-
settings
private volatile MemoryAwareThreadPoolExecutor.Settings settings
-
channelCounters
private final java.util.concurrent.ConcurrentMap<Channel,java.util.concurrent.atomic.AtomicLong> channelCounters
-
totalLimiter
private final MemoryAwareThreadPoolExecutor.Limiter totalLimiter
-
notifyOnShutdown
private volatile boolean notifyOnShutdown
-
-
Constructor Detail
-
MemoryAwareThreadPoolExecutor
public MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize)Creates a new instance.- Parameters:
corePoolSize- the maximum number of active threadsmaxChannelMemorySize- the maximum total size of the queued events per channel. Specify0to disable.maxTotalMemorySize- the maximum total size of the queued events for this pool Specify0to disable.
-
MemoryAwareThreadPoolExecutor
public MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, java.util.concurrent.TimeUnit unit)Creates a new instance.- Parameters:
corePoolSize- the maximum number of active threadsmaxChannelMemorySize- the maximum total size of the queued events per channel. Specify0to disable.maxTotalMemorySize- the maximum total size of the queued events for this pool Specify0to disable.keepAliveTime- the amount of time for an inactive thread to shut itself downunit- theTimeUnitofkeepAliveTime
-
MemoryAwareThreadPoolExecutor
public MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, java.util.concurrent.TimeUnit unit, java.util.concurrent.ThreadFactory threadFactory)Creates a new instance.- Parameters:
corePoolSize- the maximum number of active threadsmaxChannelMemorySize- the maximum total size of the queued events per channel. Specify0to disable.maxTotalMemorySize- the maximum total size of the queued events for this pool Specify0to disable.keepAliveTime- the amount of time for an inactive thread to shut itself downunit- theTimeUnitofkeepAliveTimethreadFactory- theThreadFactoryof this pool
-
MemoryAwareThreadPoolExecutor
public MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, java.util.concurrent.TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, java.util.concurrent.ThreadFactory threadFactory)Creates a new instance.- Parameters:
corePoolSize- the maximum number of active threadsmaxChannelMemorySize- the maximum total size of the queued events per channel. Specify0to disable.maxTotalMemorySize- the maximum total size of the queued events for this pool Specify0to disable.keepAliveTime- the amount of time for an inactive thread to shut itself downunit- theTimeUnitofkeepAliveTimethreadFactory- theThreadFactoryof this poolobjectSizeEstimator- theObjectSizeEstimatorof this pool
-
-
Method Detail
-
terminated
protected void terminated()
- Overrides:
terminatedin classjava.util.concurrent.ThreadPoolExecutor
-
shutdownNow
public java.util.List<java.lang.Runnable> shutdownNow()
This will callshutdownNow(boolean)with the value ofgetNotifyChannelFuturesOnShutdown().- Specified by:
shutdownNowin interfacejava.util.concurrent.ExecutorService- Overrides:
shutdownNowin classjava.util.concurrent.ThreadPoolExecutor
-
shutdownNow
public java.util.List<java.lang.Runnable> shutdownNow(boolean notify)
SeeThreadPoolExecutor.shutdownNow()for how it handles the shutdown. Iftrueis given to this method it also notifies allChannelFuture's of the not executedChannelEventRunnable's.Be aware that if you call this with
falseyou will need to handle the notification of theChannelFuture's by your self. So only use this if you really have a use-case for it.
-
getObjectSizeEstimator
public ObjectSizeEstimator getObjectSizeEstimator()
Returns theObjectSizeEstimatorof this pool.
-
setObjectSizeEstimator
public void setObjectSizeEstimator(ObjectSizeEstimator objectSizeEstimator)
Sets theObjectSizeEstimatorof this pool.
-
getMaxChannelMemorySize
public long getMaxChannelMemorySize()
Returns the maximum total size of the queued events per channel.
-
setMaxChannelMemorySize
public void setMaxChannelMemorySize(long maxChannelMemorySize)
Sets the maximum total size of the queued events per channel. Specify0to disable.
-
getMaxTotalMemorySize
public long getMaxTotalMemorySize()
Returns the maximum total size of the queued events for this pool.
-
setNotifyChannelFuturesOnShutdown
public void setNotifyChannelFuturesOnShutdown(boolean notifyOnShutdown)
If set tofalseno queuedChannelEventRunnable'sChannelFuturewill get notified onceshutdownNow()is called. If set totrueevery queuedChannelEventRunnablewill get marked as failed viaChannelFuture.setFailure(Throwable).Please only set this to
falseif you want to handle the notification by yourself and know what you are doing. Default istrue.
-
getNotifyChannelFuturesOnShutdown
public boolean getNotifyChannelFuturesOnShutdown()
Returns if theChannelFuture's of theChannelEventRunnable's should be notified about the shutdown of thisMemoryAwareThreadPoolExecutor.
-
execute
public void execute(java.lang.Runnable command)
- Specified by:
executein interfacejava.util.concurrent.Executor- Overrides:
executein classjava.util.concurrent.ThreadPoolExecutor
-
doExecute
protected void doExecute(java.lang.Runnable task)
Put the actual execution logic here. The default implementation simply callsdoUnorderedExecute(Runnable).
-
doUnorderedExecute
protected final void doUnorderedExecute(java.lang.Runnable task)
Executes the specified task without maintaining the event order.
-
remove
public boolean remove(java.lang.Runnable task)
- Overrides:
removein classjava.util.concurrent.ThreadPoolExecutor
-
beforeExecute
protected void beforeExecute(java.lang.Thread t, java.lang.Runnable r)- Overrides:
beforeExecutein classjava.util.concurrent.ThreadPoolExecutor
-
increaseCounter
protected void increaseCounter(java.lang.Runnable task)
-
decreaseCounter
protected void decreaseCounter(java.lang.Runnable task)
-
getChannelCounter
private java.util.concurrent.atomic.AtomicLong getChannelCounter(Channel channel)
-
shouldCount
protected boolean shouldCount(java.lang.Runnable task)
Returnstrueif and only if the specifiedtaskshould be counted to limit the global and per-channel memory consumption. To override this method, you must callsuper.shouldCount()to make sure important tasks are not counted.
-
-