Class MemcachedConnection
java.lang.Object
java.lang.Thread
net.spy.memcached.compat.SpyThread
net.spy.memcached.MemcachedConnection
- All Implemented Interfaces:
Runnable
Main class for handling connections to a memcached cluster.
-
Nested Class Summary
Nested classes/interfaces inherited from class Thread
Thread.Builder, Thread.State, Thread.UncaughtExceptionHandler -
Field Summary
FieldsModifier and TypeFieldDescriptionprotected final ConcurrentLinkedQueue<MemcachedNode> AddedQueue is used to track the QueueAttachments for which operations have recently been queued.private final intThe buffer size that will be used when reading from the server.private final ConnectionFactoryThe connection factory to createMemcachedNodes from.private final Collection<ConnectionObserver> Holds all connection observers that get notified on connection status changes.private static final intBy default, do not bound the retry queue.private static final intThe default wakeup delay if not overridden by a system property.private static final intThe number of empty selects we'll allow before assuming we may have missed one and should check the current selectors.private intContains the current number of empty select() calls, which could indicate bugs.private static final intThe number of empty selects we'll allow before blowing up.protected final FailureModeThe configuredFailureMode.private final ExecutorServiceTheExecutorServiceto use for callbacks.protected final NodeLocatorTheNodeLocatorto use for this connection.private static final intIf an operation gets cloned more than this ceiling, cancel it for safety reasons.private final longMaximum amount of time to wait between reconnect attempts.protected final MetricCollectorTheMetricCollectorto accumulate metrics (or dummy).protected final MetricTypeThe current type of metrics to collect.protected final ConcurrentLinkedQueue<MemcachedNode> Holds all nodes that are scheduled for shutdown.private final OperationFactoryTheOperationFactoryto clone or create operations.private static final Stringprivate static final Stringprivate static final Stringprivate static final Stringprivate static final Stringprivate static final Stringprivate static final Stringprivate static final Stringprivate static final Stringprivate final SortedMap<Long, MemcachedNode> reconnectQueue contains the attachments that need to be reconnected.Holds operations that need to be retried.private final intOptionally bound the retry queue if set via system property.protected booleanTrue if not shutting down or shut down.protected SelectorHolds the currentSelectorto use.private final booleanIf true, optimization will collapse multiple sequential get ops.private static final Stringprotected booleanIf the connection is alread shut down or shutting down.private final intThe threshold for timeout exceptions.private final booleanIf set to true, a proper check after finish connecting is done to see if the node is not responding but really alive.private final intThe selector wakeup delay, defaults to 1000ms.Fields inherited from class Thread
MAX_PRIORITY, MIN_PRIORITY, NORM_PRIORITY -
Constructor Summary
ConstructorsConstructorDescriptionMemcachedConnection(int bufSize, ConnectionFactory f, List<InetSocketAddress> a, Collection<ConnectionObserver> obs, FailureMode fm, OperationFactory opfactory) Construct aMemcachedConnection. -
Method Summary
Modifier and TypeMethodDescriptionbooleanAdd a connection observer.protected voidaddOperation(String key, Operation o) Add an operation to a connection identified by the given key.protected voidaddOperation(MemcachedNode node, Operation o) Enqueue an operation on the given node.voidaddOperations(Map<MemcachedNode, Operation> ops) Enqueue the given list of operations on each handling node.private voidAttempt to reconnectMemcachedNodes in the reconnect queue.(package private) booleanMakes sure that the given node belongs to the current cluster.Broadcast an operation to all nodes.broadcastOperation(BroadcastOpFactory of, Collection<MemcachedNode> nodes) Broadcast an operation to a collection of nodes.private voidCancel the given collection of operations.private voidCheck if one or more nodes exceeded the timeout Threshold.protected voidCheck to see if this connection is shutting down.private voidconnected(MemcachedNode node) Indicate a successful connect to the given node.Construct a String containing information about all nodes and their state.protected List<MemcachedNode> Create connections for the given list of addresses.(package private) static StringdbgBuffer(ByteBuffer b, int size) Convert theByteBufferinto a string for easier debugging.voidenqueueOperation(String key, Operation o) Enqueue the givenOperationwith the used key.private voidfinishConnect(SelectionKey sk, MemcachedNode node) Finish the connect phase and potentially verify its liveness.Returns theNodeLocatorin use for this connection.private voidHelper method forhandleIO()to handle empty select calls.private voidHandle any requests that have been made against the client.voidhandleIO()Handle all IO that flows through the connection.private voidhandleIO(SelectionKey sk) Handle IO for a specific selector.private voidHelper method forhandleIO()to encapsulate everything that needs to be checked on a regular basis that has nothing to do directly with reading and writing data.private voidhandleReads(MemcachedNode node) Handle pending reads for the given node.private voidhandleReadsAndWrites(SelectionKey sk, MemcachedNode node) A helper method forhandleIO(java.nio.channels.SelectionKey)to handle reads and writes if appropriate.private OperationhandleReadsWhenChannelEndOfStream(Operation currentOp, MemcachedNode node, ByteBuffer rbuf) Deal with an operation where the channel reached the end of a stream.protected voidhandleRetryInformation(byte[] retryMessage) Optionally handle retry (NOT_MY_VBUKET) responses.private voidCheck if nodes need to be shut down and do so if needed.protected voidHelper method which gets called if the selector is woken up because of the timeout setting, if has been interrupted or if happens during regular write operation phases.private voidhandleWrites(MemcachedNode node) Handle pending writes for the given node.voidinsertOperation(MemcachedNode node, Operation o) Insert an operation on the given node to the beginning of the queue.booleanReturns whether the connection is shut down or not.private voidLog a exception to different levels depending on the state.private voidlostConnection(MemcachedNode node) Indicate a lost connection to the given node.static voidopSucceeded(Operation op) Reset the timeout counter for the given handling node.static voidopTimedOut(Operation op) Increase the timeout counter for the given handling node.private voidMake sure channel connections are not leaked and properly close under faulty reconnect cirumstances.protected voidqueueReconnect(MemcachedNode node) Enqueue the givenMemcachedNodefor reconnect.private voidreadBufferAndLogMetrics(Operation currentOp, ByteBuffer rbuf, MemcachedNode node) Read from the buffer and add metrics information.voidRedistribute the given operation to (potentially) other nodes.voidRedistribute the given list of operations to (potentially) other nodes.protected voidRegister Metrics for collection.booleanRemove a connection observer.voidAdd a operation to the retry queue.voidrun()Handle IO as long as the application is running.private booleanMake sure that the current selectors make sense.private static voidsetTimeout(Operation op, boolean isTimeout) Set the continuous timeout on an operation.voidshutdown()Shut down all connections and do not accept further incoming ops.toString()Methods inherited from class Thread
activeCount, checkAccess, clone, currentThread, dumpStack, enumerate, getAllStackTraces, getContextClassLoader, getDefaultUncaughtExceptionHandler, getId, getName, getPriority, getStackTrace, getState, getThreadGroup, getUncaughtExceptionHandler, holdsLock, interrupt, interrupted, isAlive, isDaemon, isInterrupted, isVirtual, join, join, join, join, ofPlatform, ofVirtual, onSpinWait, setContextClassLoader, setDaemon, setDefaultUncaughtExceptionHandler, setName, setPriority, setUncaughtExceptionHandler, sleep, sleep, sleep, start, startVirtualThread, stop, threadId, yield
-
Field Details
-
DOUBLE_CHECK_EMPTY
private static final int DOUBLE_CHECK_EMPTYThe number of empty selects we'll allow before assuming we may have missed one and should check the current selectors. This generally indicates a bug, but we'll check it nonetheless.- See Also:
-
EXCESSIVE_EMPTY
private static final int EXCESSIVE_EMPTYThe number of empty selects we'll allow before blowing up. It's too easy to write a bug that causes it to loop uncontrollably. This helps find those bugs and often works around them.- See Also:
-
DEFAULT_WAKEUP_DELAY
private static final int DEFAULT_WAKEUP_DELAYThe default wakeup delay if not overridden by a system property.- See Also:
-
DEFAULT_RETRY_QUEUE_SIZE
private static final int DEFAULT_RETRY_QUEUE_SIZEBy default, do not bound the retry queue.- See Also:
-
MAX_CLONE_COUNT
private static final int MAX_CLONE_COUNTIf an operation gets cloned more than this ceiling, cancel it for safety reasons.- See Also:
-
RECON_QUEUE_METRIC
- See Also:
-
SHUTD_QUEUE_METRIC
- See Also:
-
OVERALL_REQUEST_METRIC
- See Also:
-
OVERALL_AVG_BYTES_WRITE_METRIC
- See Also:
-
OVERALL_AVG_BYTES_READ_METRIC
- See Also:
-
OVERALL_AVG_TIME_ON_WIRE_METRIC
- See Also:
-
OVERALL_RESPONSE_METRIC
- See Also:
-
OVERALL_RESPONSE_RETRY_METRIC
- See Also:
-
OVERALL_RESPONSE_FAIL_METRIC
- See Also:
-
OVERALL_RESPONSE_SUCC_METRIC
- See Also:
-
shutDown
protected volatile boolean shutDownIf the connection is alread shut down or shutting down. -
shouldOptimize
private final boolean shouldOptimizeIf true, optimization will collapse multiple sequential get ops. -
selector
-
locator
TheNodeLocatorto use for this connection. -
failureMode
The configuredFailureMode. -
maxDelay
private final long maxDelayMaximum amount of time to wait between reconnect attempts. -
emptySelects
private int emptySelectsContains the current number of empty select() calls, which could indicate bugs. -
bufSize
private final int bufSizeThe buffer size that will be used when reading from the server. -
connectionFactory
The connection factory to createMemcachedNodes from. -
addedQueue
AddedQueue is used to track the QueueAttachments for which operations have recently been queued. -
reconnectQueue
reconnectQueue contains the attachments that need to be reconnected. The key is the time at which they are eligible for reconnect. -
running
protected volatile boolean runningTrue if not shutting down or shut down. -
connObservers
Holds all connection observers that get notified on connection status changes. -
opFact
TheOperationFactoryto clone or create operations. -
timeoutExceptionThreshold
private final int timeoutExceptionThresholdThe threshold for timeout exceptions. -
retryOps
-
nodesToShutdown
Holds all nodes that are scheduled for shutdown. -
verifyAliveOnConnect
private final boolean verifyAliveOnConnectIf set to true, a proper check after finish connecting is done to see if the node is not responding but really alive. -
listenerExecutorService
TheExecutorServiceto use for callbacks. -
metrics
TheMetricCollectorto accumulate metrics (or dummy). -
metricType
The current type of metrics to collect. -
wakeupDelay
private final int wakeupDelayThe selector wakeup delay, defaults to 1000ms. -
retryQueueSize
private final int retryQueueSizeOptionally bound the retry queue if set via system property.
-
-
Constructor Details
-
MemcachedConnection
public MemcachedConnection(int bufSize, ConnectionFactory f, List<InetSocketAddress> a, Collection<ConnectionObserver> obs, FailureMode fm, OperationFactory opfactory) throws IOException Construct aMemcachedConnection.- Parameters:
bufSize- the size of the buffer used for reading from the server.f- the factory that will provide an operation queue.a- the addresses of the servers to connect to.obs- the initial observers to add.fm- the failure mode to use.opfactory- the operation factory.- Throws:
IOException- if a connection attempt fails early
-
-
Method Details
-
registerMetrics
protected void registerMetrics()Register Metrics for collection. Note that these Metrics may or may not take effect, depending on theMetricCollectorimplementation. This can be controlled from theDefaultConnectionFactory. -
createConnections
protected List<MemcachedNode> createConnections(Collection<InetSocketAddress> addrs) throws IOException Create connections for the given list of addresses.- Parameters:
addrs- the list of addresses to connect to.- Returns:
- addrs list of
MemcachedNodes. - Throws:
IOException- if connecting was not successful.
-
selectorsMakeSense
private boolean selectorsMakeSense()Make sure that the current selectors make sense.- Returns:
- true if they do.
-
handleIO
Handle all IO that flows through the connection. This method is called in an endless loop, listens on NIO selectors and dispatches the underlying read/write calls if needed.- Throws:
IOException
-
handleWokenUpSelector
protected void handleWokenUpSelector()Helper method which gets called if the selector is woken up because of the timeout setting, if has been interrupted or if happens during regular write operation phases.This method can be overriden by child implementations to handle custom behavior on a manually woken selector, like sending pings through the channels to make sure they are alive.
Note that there is no guarantee that this method is at all or in the regular interval called, so all overriding implementations need to take that into account. Also, it needs to take into account that it may be called very often under heavy workloads, so it should not perform extensive tasks in the same thread.
-
handleOperationalTasks
Helper method forhandleIO()to encapsulate everything that needs to be checked on a regular basis that has nothing to do directly with reading and writing data.- Throws:
IOException- if an error happens during shutdown queue handling.
-
handleEmptySelects
private void handleEmptySelects()Helper method forhandleIO()to handle empty select calls. -
handleShutdownQueue
Check if nodes need to be shut down and do so if needed.- Throws:
IOException- if the channel could not be closed properly.
-
checkPotentiallyTimedOutConnection
private void checkPotentiallyTimedOutConnection()Check if one or more nodes exceeded the timeout Threshold. -
handleInputQueue
private void handleInputQueue()Handle any requests that have been made against the client. -
addObserver
Add a connection observer.- Returns:
- whether the observer was successfully added.
-
removeObserver
Remove a connection observer.- Returns:
- true if the observer existed and now doesn't.
-
connected
Indicate a successful connect to the given node.- Parameters:
node- the node which was successfully connected.
-
lostConnection
Indicate a lost connection to the given node.- Parameters:
node- the node where the connection was lost.
-
belongsToCluster
Makes sure that the given node belongs to the current cluster. Before trying to connect to a node, make sure it actually belongs to the currently connected cluster. -
handleIO
Handle IO for a specific selector. Any IOException will cause a reconnect. Note that this code makes sure that the corresponding node is not only able to connect, but also able to respond in a correct fashion (if verifyAliveOnConnect is set to true through a property). This is handled by issuing a dummy version/noop call and making sure it returns in a correct and timely fashion.- Parameters:
sk- the selector to handle IO against.
-
handleReadsAndWrites
A helper method forhandleIO(java.nio.channels.SelectionKey)to handle reads and writes if appropriate.- Parameters:
sk- the selection key to use.node- th enode to read write from.- Throws:
IOException- if an error occurs during read/write.
-
finishConnect
Finish the connect phase and potentially verify its liveness.- Parameters:
sk- the selection key for the node.node- the actual node.- Throws:
IOException- if something goes wrong during reading/writing.
-
handleWrites
Handle pending writes for the given node.- Parameters:
node- the node to handle writes for.- Throws:
IOException- can be raised during writing failures.
-
handleReads
Handle pending reads for the given node.- Parameters:
node- the node to handle reads for.- Throws:
IOException- can be raised during reading failures.
-
readBufferAndLogMetrics
private void readBufferAndLogMetrics(Operation currentOp, ByteBuffer rbuf, MemcachedNode node) throws IOException Read from the buffer and add metrics information.- Parameters:
currentOp- the current operation to read.rbuf- the read buffer to read from.node- the node to read from.- Throws:
IOException- if reading was not successful.
-
handleReadsWhenChannelEndOfStream
private Operation handleReadsWhenChannelEndOfStream(Operation currentOp, MemcachedNode node, ByteBuffer rbuf) throws IOException Deal with an operation where the channel reached the end of a stream.- Parameters:
currentOp- the current operation to read.node- the node for that operation.rbuf- the read buffer.- Returns:
- the next operation on the node to read.
- Throws:
IOException- if disconnect while reading.
-
dbgBuffer
Convert theByteBufferinto a string for easier debugging.- Parameters:
b- the buffer to debug.size- the size of the buffer.- Returns:
- the stringified
ByteBuffer.
-
handleRetryInformation
protected void handleRetryInformation(byte[] retryMessage) Optionally handle retry (NOT_MY_VBUKET) responses. This method can be overridden in subclasses to handle the content of the retry message appropriately.- Parameters:
retryMessage- the body of the retry message.
-
queueReconnect
Enqueue the givenMemcachedNodefor reconnect.- Parameters:
node- the node to reconnect.
-
cancelOperations
Cancel the given collection of operations.- Parameters:
ops- the list of operations to cancel.
-
redistributeOperations
Redistribute the given list of operations to (potentially) other nodes. Note that operations can only be redistributed if they have not been cancelled already, timed out already or do not have definite targets (a key).- Parameters:
ops- the operations to redistribute.
-
redistributeOperation
Redistribute the given operation to (potentially) other nodes. Note that operations can only be redistributed if they have not been cancelled already, timed out already or do not have definite targets (a key).- Parameters:
op- the operation to redistribute.
-
attemptReconnects
private void attemptReconnects()Attempt to reconnectMemcachedNodes in the reconnect queue. If theMemcachedNodedoes not belong to the cluster list anymore, the reconnect attempt is cancelled. If it does, the code tries to reconnect immediately and if this is not possible it waits until the connection information arrives. Note that if a socket error arises during reconnect, the node is scheduled for re-reconnect immediately. -
potentiallyCloseLeakingChannel
Make sure channel connections are not leaked and properly close under faulty reconnect cirumstances.- Parameters:
ch- the channel to potentially close.node- the node to which the channel should be bound to.
-
getLocator
Returns theNodeLocatorin use for this connection.- Returns:
- the current
NodeLocator.
-
enqueueOperation
-
addOperation
Add an operation to a connection identified by the given key. If theMemcachedNodeis active or theFailureModeis set to retry, the primary node will be used for that key. If the primary node is not available and theFailureModecancel is used, the operation will be cancelled without further retry. For any otherFailureModemechanisms (Redistribute), another possible node is used (only if its active as well). If no other active node could be identified, the original primary node is used and retried.- Parameters:
key- the key the operation is operating upon.o- the operation to add.
-
insertOperation
Insert an operation on the given node to the beginning of the queue.- Parameters:
node- the node where to insert theOperation.o- the operation to insert.
-
addOperation
Enqueue an operation on the given node.- Parameters:
node- the node where to enqueue theOperation.o- the operation to add.
-
addOperations
Enqueue the given list of operations on each handling node.- Parameters:
ops- the operations for each node.
-
broadcastOperation
Broadcast an operation to all nodes.- Returns:
- a
CountDownLatchthat will be counted down when the operations are complete.
-
broadcastOperation
Broadcast an operation to a collection of nodes.- Returns:
- a
CountDownLatchthat will be counted down when the operations are complete.
-
shutdown
Shut down all connections and do not accept further incoming ops.- Throws:
IOException
-
toString
-
connectionsStatus
Construct a String containing information about all nodes and their state.- Returns:
- a stringified representation of the connection status.
-
opTimedOut
Increase the timeout counter for the given handling node.- Parameters:
op- the operation to grab the node from.
-
opSucceeded
Reset the timeout counter for the given handling node.- Parameters:
op- the operation to grab the node from.
-
setTimeout
Set the continuous timeout on an operation. Ignore operations which have no handling nodes set yet (which may happen before nodes are properly authenticated).- Parameters:
op- the operation to use.isTimeout- is timed out or not.
-
checkState
protected void checkState()Check to see if this connection is shutting down.- Throws:
IllegalStateException- when shutting down.
-
run
-
logRunException
Log a exception to different levels depending on the state. Exceptions get logged at debug level when happening during shutdown, but at warning level when operating normally.- Parameters:
e- the exception to log.
-
isShutDown
public boolean isShutDown()Returns whether the connection is shut down or not.- Returns:
- true if the connection is shut down, false otherwise.
-
retryOperation
Add a operation to the retry queue. If the retry queue size is bounded and the size of the queue is reaching that boundary, the operation is cancelled rather than added to the retry queue.- Parameters:
op- the operation to retry.
-