Class MemoryAwareThreadPoolExecutor
- All Implemented Interfaces:
Executor,ExecutorService
- Direct Known Subclasses:
FairOrderedMemoryAwareThreadPoolExecutor,OrderedMemoryAwareThreadPoolExecutor
ThreadPoolExecutor which blocks the task submission when there's
too many tasks in the queue. Both per-Channel and per-Executor
limitation can be applied.
When a task (i.e. Runnable) is submitted,
MemoryAwareThreadPoolExecutor calls ObjectSizeEstimator.estimateSize(Object)
to get the estimated size of the task in bytes to calculate the amount of
memory occupied by the unprocessed tasks.
If the total size of the unprocessed tasks exceeds either per-Channel
or per-Executor threshold, any further execute(Runnable)
call will block until the tasks in the queue are processed so that the total
size goes under the threshold.
Using an alternative task size estimation strategy
Although the default implementation does its best to guess the size of an object of unknown type, it is always good idea to to use an alternativeObjectSizeEstimator implementation instead of the
DefaultObjectSizeEstimator to avoid incorrect task size calculation,
especially when:
- you are using
MemoryAwareThreadPoolExecutorindependently fromExecutionHandler, - you are submitting a task whose type is not
ChannelEventRunnable, or - the message type of the
MessageEventin theChannelEventRunnableis notChannelBuffer.
ObjectSizeEstimator
which understands a user-defined object:
public class MyRunnable implementsRunnable{ private final byte[] data; public MyRunnable(byte[] data) { this.data = data; } public void run() { // Process 'data' .. } } public class MyObjectSizeEstimator extendsDefaultObjectSizeEstimator{ @Override public int estimateSize(Object o) { if (o instanceof MyRunnable) { return ((MyRunnable) o).data.length + 8; } return super.estimateSize(o); } }ThreadPoolExecutorpool = newMemoryAwareThreadPoolExecutor( 16, 65536, 1048576, 30,TimeUnit.SECONDS, new MyObjectSizeEstimator(),Executors.defaultThreadFactory()); pool.execute(new MyRunnable(data));
Event execution order
Please note that this executor does not maintain the order of theChannelEvents for the same Channel. For example,
you can even receive a "channelClosed" event before a
"messageReceived" event, as depicted by the following diagram.
For example, the events can be processed as depicted below:
--------------------------------> Timeline -------------------------------->
Thread X: --- Channel A (Event 1) --- Channel A (Event 2) --------------------------->
Thread Y: --- Channel A (Event 3) --- Channel B (Event 2) --- Channel B (Event 3) --->
Thread Z: --- Channel B (Event 1) --- Channel B (Event 4) --- Channel A (Event 4) --->
To maintain the event order, you must use OrderedMemoryAwareThreadPoolExecutor.-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionprivate static classprivate static final classprivate static final classprivate static final classNested classes/interfaces inherited from class java.util.concurrent.ThreadPoolExecutor
ThreadPoolExecutor.AbortPolicy, ThreadPoolExecutor.CallerRunsPolicy, ThreadPoolExecutor.DiscardOldestPolicy, ThreadPoolExecutor.DiscardPolicy -
Field Summary
FieldsModifier and TypeFieldDescriptionprivate final ConcurrentMap<Channel, AtomicLong> private static final InternalLoggerprivate static final SharedResourceMisuseDetectorprivate booleanprivate final MemoryAwareThreadPoolExecutor.Limiter -
Constructor Summary
ConstructorsConstructorDescriptionMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize) Creates a new instance.MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit) Creates a new instance.MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) Creates a new instance.MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory) Creates a new instance. -
Method Summary
Modifier and TypeMethodDescriptionprotected voidbeforeExecute(Thread t, Runnable r) protected voiddecreaseCounter(Runnable task) protected voidPut the actual execution logic here.protected final voiddoUnorderedExecute(Runnable task) Executes the specified task without maintaining the event order.voidprivate AtomicLonggetChannelCounter(Channel channel) longReturns the maximum total size of the queued events per channel.longReturns the maximum total size of the queued events for this pool.booleanReturns if theChannelFuture's of theChannelEventRunnable's should be notified about the shutdown of thisMemoryAwareThreadPoolExecutor.Returns theObjectSizeEstimatorof this pool.protected voidincreaseCounter(Runnable task) booleanvoidsetMaxChannelMemorySize(long maxChannelMemorySize) Sets the maximum total size of the queued events per channel.voidsetNotifyChannelFuturesOnShutdown(boolean notifyOnShutdown) If set tofalseno queuedChannelEventRunnable'sChannelFuturewill get notified onceshutdownNow()is called.voidsetObjectSizeEstimator(ObjectSizeEstimator objectSizeEstimator) Sets theObjectSizeEstimatorof this pool.protected booleanshouldCount(Runnable task) Returnstrueif and only if the specifiedtaskshould be counted to limit the global and per-channel memory consumption.This will callshutdownNow(boolean)with the value ofgetNotifyChannelFuturesOnShutdown().shutdownNow(boolean notify) SeeThreadPoolExecutor.shutdownNow()for how it handles the shutdown.protected voidMethods inherited from class java.util.concurrent.ThreadPoolExecutor
afterExecute, allowCoreThreadTimeOut, allowsCoreThreadTimeOut, awaitTermination, finalize, getActiveCount, getCompletedTaskCount, getCorePoolSize, getKeepAliveTime, getLargestPoolSize, getMaximumPoolSize, getPoolSize, getQueue, getRejectedExecutionHandler, getTaskCount, getThreadFactory, isShutdown, isTerminated, isTerminating, prestartAllCoreThreads, prestartCoreThread, purge, setCorePoolSize, setKeepAliveTime, setMaximumPoolSize, setRejectedExecutionHandler, setThreadFactory, shutdown, toStringMethods inherited from class java.util.concurrent.AbstractExecutorService
invokeAll, invokeAll, invokeAny, invokeAny, newTaskFor, newTaskFor, submit, submit, submit
-
Field Details
-
logger
-
misuseDetector
-
settings
-
channelCounters
-
totalLimiter
-
notifyOnShutdown
private volatile boolean notifyOnShutdown
-
-
Constructor Details
-
MemoryAwareThreadPoolExecutor
public MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize) Creates a new instance.- Parameters:
corePoolSize- the maximum number of active threadsmaxChannelMemorySize- the maximum total size of the queued events per channel. Specify0to disable.maxTotalMemorySize- the maximum total size of the queued events for this pool Specify0to disable.
-
MemoryAwareThreadPoolExecutor
public MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit) Creates a new instance.- Parameters:
corePoolSize- the maximum number of active threadsmaxChannelMemorySize- the maximum total size of the queued events per channel. Specify0to disable.maxTotalMemorySize- the maximum total size of the queued events for this pool Specify0to disable.keepAliveTime- the amount of time for an inactive thread to shut itself downunit- theTimeUnitofkeepAliveTime
-
MemoryAwareThreadPoolExecutor
public MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) Creates a new instance.- Parameters:
corePoolSize- the maximum number of active threadsmaxChannelMemorySize- the maximum total size of the queued events per channel. Specify0to disable.maxTotalMemorySize- the maximum total size of the queued events for this pool Specify0to disable.keepAliveTime- the amount of time for an inactive thread to shut itself downunit- theTimeUnitofkeepAliveTimethreadFactory- theThreadFactoryof this pool
-
MemoryAwareThreadPoolExecutor
public MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory) Creates a new instance.- Parameters:
corePoolSize- the maximum number of active threadsmaxChannelMemorySize- the maximum total size of the queued events per channel. Specify0to disable.maxTotalMemorySize- the maximum total size of the queued events for this pool Specify0to disable.keepAliveTime- the amount of time for an inactive thread to shut itself downunit- theTimeUnitofkeepAliveTimeobjectSizeEstimator- theObjectSizeEstimatorof this poolthreadFactory- theThreadFactoryof this pool
-
-
Method Details
-
terminated
protected void terminated()- Overrides:
terminatedin classThreadPoolExecutor
-
shutdownNow
This will callshutdownNow(boolean)with the value ofgetNotifyChannelFuturesOnShutdown().- Specified by:
shutdownNowin interfaceExecutorService- Overrides:
shutdownNowin classThreadPoolExecutor
-
shutdownNow
SeeThreadPoolExecutor.shutdownNow()for how it handles the shutdown. Iftrueis given to this method it also notifies allChannelFuture's of the not executedChannelEventRunnable's.Be aware that if you call this with
falseyou will need to handle the notification of theChannelFuture's by your self. So only use this if you really have a use-case for it. -
getObjectSizeEstimator
Returns theObjectSizeEstimatorof this pool. -
setObjectSizeEstimator
Sets theObjectSizeEstimatorof this pool. -
getMaxChannelMemorySize
public long getMaxChannelMemorySize()Returns the maximum total size of the queued events per channel. -
setMaxChannelMemorySize
public void setMaxChannelMemorySize(long maxChannelMemorySize) Sets the maximum total size of the queued events per channel. Specify0to disable. -
getMaxTotalMemorySize
public long getMaxTotalMemorySize()Returns the maximum total size of the queued events for this pool. -
setNotifyChannelFuturesOnShutdown
public void setNotifyChannelFuturesOnShutdown(boolean notifyOnShutdown) If set tofalseno queuedChannelEventRunnable'sChannelFuturewill get notified onceshutdownNow()is called. If set totrueevery queuedChannelEventRunnablewill get marked as failed viaChannelFuture.setFailure(Throwable).Please only set this to
falseif you want to handle the notification by yourself and know what you are doing. Default istrue. -
getNotifyChannelFuturesOnShutdown
public boolean getNotifyChannelFuturesOnShutdown()Returns if theChannelFuture's of theChannelEventRunnable's should be notified about the shutdown of thisMemoryAwareThreadPoolExecutor. -
execute
- Specified by:
executein interfaceExecutor- Overrides:
executein classThreadPoolExecutor
-
doExecute
Put the actual execution logic here. The default implementation simply callsdoUnorderedExecute(Runnable). -
doUnorderedExecute
Executes the specified task without maintaining the event order. -
remove
- Overrides:
removein classThreadPoolExecutor
-
beforeExecute
- Overrides:
beforeExecutein classThreadPoolExecutor
-
increaseCounter
-
decreaseCounter
-
getChannelCounter
-
shouldCount
Returnstrueif and only if the specifiedtaskshould be counted to limit the global and per-channel memory consumption. To override this method, you must callsuper.shouldCount()to make sure important tasks are not counted.
-