首页 > 代码库 > spymemcached源码分析
spymemcached源码分析
主题
核心概念
IO模型
服务器管理
序列化
set分析
get分析
cas分析
补充文档
核心概念
spymemcached是memcached客户端的开源java实现,我们学习任何一种软件,首先需要从整体上对该软件有个了解,也就是中心领域模型是什么?我们首先来看下它的几个核心概念以及组成关系。
MemcachedClient:对于使用客户端的程序员来讲,直接构造的就是这个对象,用于所有和memcached相关的所有操作提交。我们可以看到client在构建的时候,会初始化和它相关的对象,并且在创建连接的时候启动IO模型。
public MemcachedClient(ConnectionFactory cf, List<InetSocketAddress> addrs)
throws IOException {
if (cf == null) {
throw new NullPointerException("Connection factory required");
}
if (addrs == null) {
throw new NullPointerException("Server list required");
}
if (addrs.isEmpty()) {
throw new IllegalArgumentException("You must have at least one server to"
+ " connect to");
}
if (cf.getOperationTimeout() <= 0) {
throw new IllegalArgumentException("Operation timeout must be positive.");
}
connFactory = cf;
tcService = new TranscodeService(cf.isDaemon());
transcoder = cf.getDefaultTranscoder();
opFact = cf.getOperationFactory();
assert opFact != null : "Connection factory failed to make op factory";
mconn = cf.createConnection(addrs);
assert mconn != null : "Connection factory failed to make a connection";
operationTimeout = cf.getOperationTimeout();
authDescriptor = cf.getAuthDescriptor();
executorService = cf.getListenerExecutorService();
if (authDescriptor != null) {
addObserver(this);
}
}
MemcachedNode:一个node对应一台memcached服务器节点
从图中我们可以看到node主要实现类为TCPMemcachedNodeImpl.针对ascii和binary协议有2个特定实现。
首先我们分析下TCPMemcachedNodeImpl的数据结构:
private final SocketAddress socketAddress;//对应服务器的地址
private final ByteBuffer rbuf;//读缓冲区
private final ByteBuffer wbuf;//写缓冲区
protected final BlockingQueue<Operation> writeQ;//读指令度列
private final BlockingQueue<Operation> readQ;//写指令队列
private final BlockingQueue<Operation> inputQueue;//指令提交队列
private final long opQueueMaxBlockTime;//操作队列最大阻塞时间
private final long authWaitTime;//鉴权等待事件
private final ConnectionFactory connectionFactory;//连接工厂
private AtomicInteger reconnectAttempt = new AtomicInteger(1);//重连原子计数器
private SocketChannel channel;//连接对应的channel
private int toWrite = 0;//写入数据的总字节数
protected Operation optimizedOp = null;//当前的op
private volatile SelectionKey sk = null;//注册在主selector上的sk
private boolean shouldAuth = false;
private CountDownLatch authLatch;
private ArrayList<Operation> reconnectBlocked;//重连阻塞队列
private long defaultOpTimeout;
private volatile long lastReadTimestamp = System.nanoTime();
private MemcachedConnection connection;//持有该node的connection
// operation Future.get timeout counter
private final AtomicInteger continuousTimeout = new AtomicInteger(0);
接下来分析下该接口的几个主要方法:
/**
* Interface defining a connection to a memcached server.
*/
public interface MemcachedNode {
/**
* Move all of the operations delivered via addOperation into the internal
* write queue.该方法将提交的operation提交到内部的write队列中
*/
void copyInputQueue();
/**
* Extract all queued items for this node destructively.
*该方法会将input队列的数据复制出来,同时销毁input队列
* This is useful for redistributing items.
*/
Collection<Operation> destroyInputQueue();
/**该方法将会将队列中的请求全部取消,并且清空
* Clear the queue of currently processing operations by either cancelling
* them or setting them up to be reapplied after a reconnect.
*/
void setupResend();
/**
*该方法将operation中的buf数据提交到node的wbuf中
* Fill the write buffer with data from the next operations in the queue.
*
* @param optimizeGets if true, combine sequential gets into a single
* multi-key get
*/
void fillWriteBuffer(boolean optimizeGets);
/**
* Transition the current write item into a read state.
*/
void transitionWriteItem();
/**
* Get the operation at the top of the queue that is requiring input.
*/
Operation getCurrentReadOp();
/**
* Remove the operation at the top of the queue that is requiring input.
*/
Operation removeCurrentReadOp();
/**
* Get the operation at the top of the queue that has information available to
* write.
*/
Operation getCurrentWriteOp();
/**
* Remove the operation at the top of the queue that has information available
* to write.
*/
Operation removeCurrentWriteOp();
/**
* True if an operation is available to read.
*/
boolean hasReadOp();
/**
* True if an operation is available to write.
*/
boolean hasWriteOp();
/**
* Add an operation to the queue. Authentication operations should never be
* added to the queue, but this is not checked.
*/
void addOp(Operation op);
/**
* Insert an operation to the beginning of the queue.
*
* This method is meant to be invoked rarely.
*/
void insertOp(Operation o);
/**
* Compute the appropriate selection operations for the channel this
* MemcachedNode holds to the server.
*/
int getSelectionOps();
/**
* Get the buffer used for reading data from this node.
*/
ByteBuffer getRbuf();
/**
* Get the buffer used for writing data to this node.
*/
ByteBuffer getWbuf();
/**
* Get the SocketAddress of the server to which this node is connected.
*/
SocketAddress getSocketAddress();
/**
* True if this node is <q>active.</q> i.e. is is currently connected and
* expected to be able to process requests
*/
boolean isActive();
/**
* True if this node is <q>authenticated.</q>
*/
boolean isAuthenticated();
/**当前时间距离最后一次读取时间的差值
* Milliseconds since last successful read.
*/
long lastReadDelta();
/**
* Notify node of successful read.
*
* This is used so the node can keep track of any internal debugging or
* state it cares about on read.
*/
void completedRead();
/**
* Notify this node that it will be reconnecting.
*/
void reconnecting();
/**
* Notify this node that it has reconnected.
*/
void connected();
/**
* Get the current reconnect count.
*/
int getReconnectCount();
/**
* Register a channel with this node.
*/
void registerChannel(SocketChannel ch, SelectionKey selectionKey);
/**
* Set the SocketChannel this node uses.
*/
void setChannel(SocketChannel to);
/**
* Get the SocketChannel for this connection.
*/
SocketChannel getChannel();
/**设置selectKey.
*ch.register(selector, ops, qa)为初始构建
* Set the selection key for this node.
*/
void setSk(SelectionKey to);
/**
* Get the selection key from this node.
*/
SelectionKey getSk();
/**
* Get the number of bytes remaining to write.
*/
int getBytesRemainingToWrite();
/**
* Write some bytes and return the number of bytes written.
*将数据写到node对应的channel中
* @return the number of bytes written
* @throws IOException if there‘s a problem writing
*/
int writeSome() throws IOException;
/**设置selectKey感兴趣的事件,具体看下实现代码就清楚了
*
* Fix up the selection ops on the selection key.
*/
void fixupOps();
/**
* Let the node know that auth is complete. Typically this would mean the node
* can start processing and accept new operations to its input queue.
*/
void authComplete();
/**
* Tell a node to set up for authentication. Typically this would mean
* blocking additions to the queue. In a reconnect situation this may mean
* putting any queued operations on hold to get to an auth complete state.
*/
void setupForAuth();
/**
* Count ‘time out‘ exceptions to drop connections that fail perpetually.
*
* @param timedOut
*/
void setContinuousTimeout(boolean timedOut);
int getContinuousTimeout();
MemcachedConnection getConnection();
void setConnection(MemcachedConnection connection);
MemcachedConnection:一个memcached客户端只有一个connection实例,一个connection包含一组MemcachedNode列表。我们来看下connection的核心数据结构:
/**
* The number of empty selects we‘ll allow before assuming we may have
* missed one and should check the current selectors. This generally
* indicates a bug, but we‘ll check it nonetheless.
*/
private static final int DOUBLE_CHECK_EMPTY = 256;
/**
* The number of empty selects we‘ll allow before blowing up. It‘s too
* easy to write a bug that causes it to loop uncontrollably. This helps
* find those bugs and often works around them.
*/
private static final int EXCESSIVE_EMPTY = 0x1000000;
/**
* The default wakeup delay if not overriden by a system property.
*/
private static final int DEFAULT_WAKEUP_DELAY = 1000;
/**
* If an operation gets cloned more than this ceiling, cancel it for
* safety reasons.
*/
private static final int MAX_CLONE_COUNT = 100;
/**
* If the connection is alread shut down or shutting down.
*/
protected volatile boolean shutDown = false;
/**
* If true, optimization will collapse multiple sequential get ops.
*/
private final boolean shouldOptimize;
/**
* Holds the current {@link Selector} to use.
*/
protected Selector selector = null;
/**
* The {@link NodeLocator} to use for this connection.
*/
protected final NodeLocator locator;
/**
* The configured {@link FailureMode}.
*/
protected final FailureMode failureMode;
/**
* Maximum amount of time to wait between reconnect attempts.
*/
private final long maxDelay;
/**
* Contains the current number of empty select() calls, which could indicate
* bugs.
*/
private int emptySelects = 0;
/**
* The buffer size that will be used when reading from the server.
*/
private final int bufSize;
/**
* The connection factory to create {@link MemcachedNode}s from.
*/
private final ConnectionFactory connectionFactory;
/**
* AddedQueue is used to track the QueueAttachments for which operations
* have recently been queued.
*/
protected final ConcurrentLinkedQueue<MemcachedNode> addedQueue;
/**
* reconnectQueue contains the attachments that need to be reconnected.
* The key is the time at which they are eligible for reconnect.
*/
private final SortedMap<Long, MemcachedNode> reconnectQueue;
/**
* True if not shutting down or shut down.
*/
protected volatile boolean running = true;
/**
* Holds all connection observers that get notified on connection status
* changes.
*/
private final Collection<ConnectionObserver> connObservers =
new ConcurrentLinkedQueue<ConnectionObserver>();
/**
* The {@link OperationFactory} to clone or create operations.
*/
private final OperationFactory opFact;
/**
* The threshold for timeout exceptions.
*/
private final int timeoutExceptionThreshold;
/**
* Holds operations that need to be retried.
*/
private final List<Operation> retryOps;
/**
* Holds all nodes that are scheduled for shutdown.
*/
protected final ConcurrentLinkedQueue<MemcachedNode> nodesToShutdown;
/**
* If set to true, a proper check after finish connecting is done to see
* if the node is not responding but really alive.
*/
private final boolean verifyAliveOnConnect;
/**
* The {@link ExecutorService} to use for callbacks.
*/
private final ExecutorService listenerExecutorService;
/**
* The {@link MetricCollector} to accumulate metrics (or dummy).
*/
protected final MetricCollector metrics;
/**
* The current type of metrics to collect.
*/
protected final MetricType metricType;
/**
* The selector wakeup delay, defaults to 1000ms.
*/
private final int wakeupDelay;
SerializingTranscoder:memcached做为一个KV服务器,需要将保存的value对象序列化成字节数组,SerializingTranscoder就是如何对不同的对象类型进行转换的工作。
OperationFactory:用于创在不同类型的Operation对象,因为memcached有两种协议ascii和binary,默认采用的是ascii协议,我们这里仅针对ascii协议进行分析,针对ascii协议具体的实现类为:AsciiOperationFactory.
Operation:操作memcached的一个操作。也分为两组ascii和binary.
OperationCallback:操作成功之后的回调,由于spymemcached采用了异步IO的提交模式,所以取回结果需要有callback的实现。
他们之间的关系如下:
IO模型
spymemcached采用了单线程异步NIO的方式来管理所有的请求和连接。
在IO模型的启动方式如下:构建MemcachedClient->createConnection->start();
MemcachedClient继承了Thread,start方法其实是启动了一个线程。我们再看该线程的run方法:
public void run() {
while (running) {
try {
handleIO();
} catch (IOException e) {
logRunException(e);
} catch (CancelledKeyException e) {
logRunException(e);
} catch (ClosedSelectorException e) {
logRunException(e);
} catch (IllegalStateException e) {
logRunException(e);
} catch (ConcurrentModificationException e) {
logRunException(e);
}
}
getLogger().info("Shut down memcached client");
}
所以IO的核心函数为handleIO,我们再来看该方法:
public void handleIO() throws IOException {
if (shutDown) {
getLogger().debug("No IO while shut down.");
return;
}
handleInputQueue();
getLogger().debug("Done dealing with queue.");
long delay = 1000;
if (!reconnectQueue.isEmpty()) {
long now = System.currentTimeMillis();
long then = reconnectQueue.firstKey();
delay = Math.max(then - now, 1);
}
getLogger().debug("Selecting with delay of %sms", delay);
assert selectorsMakeSense() : "Selectors don‘t make sense.";
int selected = selector.select(delay);
if (shutDown) {
return;
} else if (selected == 0 && addedQueue.isEmpty()) {
handleWokenUpSelector();
} else if (selector.selectedKeys().isEmpty()) {
handleEmptySelects();
} else {
getLogger().debug("Selected %d, selected %d keys", selected,
selector.selectedKeys().size());
emptySelects = 0;
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while(iterator.hasNext()) {
SelectionKey sk = iterator.next();
handleIO(sk);
iterator.remove();
}
}
handleOperationalTasks();
}
这个方法的主要流程有几个过程:
handleInputQueue
private void handleInputQueue() {
if (!addedQueue.isEmpty()) {
getLogger().debug("Handling queue");
Collection<MemcachedNode> toAdd = new HashSet<MemcachedNode>();
Collection<MemcachedNode> todo = new HashSet<MemcachedNode>();
MemcachedNode qaNode;
while ((qaNode = addedQueue.poll()) != null) {
todo.add(qaNode);
}
for (MemcachedNode node : todo) {
boolean readyForIO = false;
if (node.isActive()) {
if (node.getCurrentWriteOp() != null) {
readyForIO = true;
getLogger().debug("Handling queued write %s", node);
}
} else {
toAdd.add(node);
}
node.copyInputQueue();
if (readyForIO) {
try {
if (node.getWbuf().hasRemaining()) {
handleWrites(node);
}
} catch (IOException e) {
getLogger().warn("Exception handling write", e);
lostConnection(node);
}
}
node.fixupOps();
}
addedQueue.addAll(toAdd);
}
}
该方法把addedQueue队里中的就绪的node取出来,假如node中已经提交了写请求,则处理之,否则把node重新放回addedQueue队列中。
private void handleIO(final SelectionKey sk)
private void handleIO(final SelectionKey sk) {
MemcachedNode node = (MemcachedNode) sk.attachment();
try {
getLogger().debug("Handling IO for: %s (r=%s, w=%s, c=%s, op=%s)", sk,
sk.isReadable(), sk.isWritable(), sk.isConnectable(),
sk.attachment());
if (sk.isConnectable() && belongsToCluster(node)) {
getLogger().debug("Connection state changed for %s", sk);
final SocketChannel channel = node.getChannel();
if (channel.finishConnect()) {
finishConnect(sk, node);
} else {
assert !channel.isConnected() : "connected";
}
} else {
handleReadsAndWrites(sk, node);
}
} catch (ClosedChannelException e) {
if (!shutDown) {
getLogger().info("Closed channel and not shutting down. Queueing"
+ " reconnect on %s", node, e);
lostConnection(node);
}
} catch (ConnectException e) {
getLogger().info("Reconnecting due to failure to connect to %s", node, e);
queueReconnect(node);
} catch (OperationException e) {
node.setupForAuth();
getLogger().info("Reconnection due to exception handling a memcached "
+ "operation on %s. This may be due to an authentication failure.",
node, e);
lostConnection(node);
} catch (Exception e) {
node.setupForAuth();
getLogger().info("Reconnecting due to exception on %s", node, e);
lostConnection(node);
}
node.fixupOps();
}
该方法根据sk目前的状态来决定如何处理事件,例如sk.isConnectable也就是连接刚建立,那么就会调用finishConnect方法,否则则直接调用handleReadsAndWrites
private void finishConnect(final SelectionKey sk, final MemcachedNode node)
throws IOException {
if (verifyAliveOnConnect) {
final CountDownLatch latch = new CountDownLatch(1);
final OperationFuture<Boolean> rv = new OperationFuture<Boolean>("noop",
latch, 2500, listenerExecutorService);
NoopOperation testOp = opFact.noop(new OperationCallback() {
public void receivedStatus(OperationStatus status) {
rv.set(status.isSuccess(), status);
}
@Override
public void complete() {
latch.countDown();
}
});
testOp.setHandlingNode(node);
testOp.initialize();
checkState();
insertOperation(node, testOp);
node.copyInputQueue();
boolean done = false;
if (sk.isValid()) {
long timeout = TimeUnit.MILLISECONDS.toNanos(
connectionFactory.getOperationTimeout());
long stop = System.nanoTime() + timeout;
while (stop > System.nanoTime()) {
handleWrites(node);
handleReads(node);
if(done = (latch.getCount() == 0)) {
break;
}
}
}
if (!done || testOp.isCancelled() || testOp.hasErrored()
|| testOp.isTimedOut()) {
throw new ConnectException("Could not send noop upon connect! "
+ "This may indicate a running, but not responding memcached "
+ "instance.");
}
}
connected(node);
addedQueue.offer(node);
if (node.getWbuf().hasRemaining()) {
handleWrites(node);
}
}
finishConnect方法构建了一条NoopOperation,实际上是一条VersionOperationImpl指令来和服务器通信,假如服务器没响应或者超时,那么就抛异常。如果成功,则将重连的计数器清空。
handleWrites
private void handleWrites(final MemcachedNode node) throws IOException {
node.fillWriteBuffer(shouldOptimize);
boolean canWriteMore = node.getBytesRemainingToWrite() > 0;
while (canWriteMore) {
int wrote = node.writeSome();
metrics.updateHistogram(OVERALL_AVG_BYTES_WRITE_METRIC, wrote);
node.fillWriteBuffer(shouldOptimize);
canWriteMore = wrote > 0 && node.getBytesRemainingToWrite() > 0;
}
}
这个该方法比较简单,首先node将数据拷贝到wbuf中,然后将wbuf写入到channel中。
handleReads
该函数比较复杂
private void handleReads(final MemcachedNode node) throws IOException {
Operation currentOp = node.getCurrentReadOp();
if (currentOp instanceof TapAckOperationImpl) {
node.removeCurrentReadOp();
return;
}
ByteBuffer rbuf = node.getRbuf();
final SocketChannel channel = node.getChannel();
int read = channel.read(rbuf);
metrics.updateHistogram(OVERALL_AVG_BYTES_READ_METRIC, read);
if (read < 0) {
currentOp = handleReadsWhenChannelEndOfStream(currentOp, node, rbuf);
}
while (read > 0) {
getLogger().debug("Read %d bytes", read);
rbuf.flip();
while (rbuf.remaining() > 0) {
if (currentOp == null) {
throw new IllegalStateException("No read operation.");
}
long timeOnWire =
System.nanoTime() - currentOp.getWriteCompleteTimestamp();
metrics.updateHistogram(OVERALL_AVG_TIME_ON_WIRE_METRIC,
(int)(timeOnWire / 1000));
metrics.markMeter(OVERALL_RESPONSE_METRIC);
synchronized(currentOp) {
readBufferAndLogMetrics(currentOp, rbuf, node);
}
currentOp = node.getCurrentReadOp();
}
rbuf.clear();
read = channel.read(rbuf);
node.completedRead();
}
}
假如没有数据可读,则调用:
private Operation handleReadsWhenChannelEndOfStream(final Operation currentOp,
final MemcachedNode node, final ByteBuffer rbuf) throws IOException {
if (currentOp instanceof TapOperation) {
currentOp.getCallback().complete();
((TapOperation) currentOp).streamClosed(OperationState.COMPLETE);
getLogger().debug("Completed read op: %s and giving the next %d bytes",
currentOp, rbuf.remaining());
Operation op = node.removeCurrentReadOp();
assert op == currentOp : "Expected to pop " + currentOp + " got " + op;
return node.getCurrentReadOp();
} else {
throw new IOException("Disconnected unexpected, will reconnect.");
}
}
该方法调用operation回调函数的complete方法。
假如有数据可读,则调用readBufferAndLogMetrics方法读取:
private void readBufferAndLogMetrics(final Operation currentOp,
final ByteBuffer rbuf, final MemcachedNode node) throws IOException {
currentOp.readFromBuffer(rbuf);
if (currentOp.getState() == OperationState.COMPLETE) {
getLogger().debug("Completed read op: %s and giving the next %d "
+ "bytes", currentOp, rbuf.remaining());
Operation op = node.removeCurrentReadOp();
assert op == currentOp : "Expected to pop " + currentOp + " got "
+ op;
if (op.hasErrored()) {
metrics.markMeter(OVERALL_RESPONSE_FAIL_METRIC);
} else {
metrics.markMeter(OVERALL_RESPONSE_SUCC_METRIC);
}
} else if (currentOp.getState() == OperationState.RETRY) {
handleRetryInformation(currentOp.getErrorMsg());
getLogger().debug("Reschedule read op due to NOT_MY_VBUCKET error: "
+ "%s ", currentOp);
((VBucketAware) currentOp).addNotMyVbucketNode(
currentOp.getHandlingNode());
Operation op = node.removeCurrentReadOp();
assert op == currentOp : "Expected to pop " + currentOp + " got "
+ op;
retryOps.add(currentOp);
metrics.markMeter(OVERALL_RESPONSE_RETRY_METRIC);
}
}
该方法最核心的一句为:currentOp.readFromBuffer(rbuf);
我们来看下该函数的实现:
public void readFromBuffer(ByteBuffer data) throws IOException {
// Loop while there‘s data remaining to get it all drained.
while (getState() != OperationState.COMPLETE && data.remaining() > 0) {
if (readType == OperationReadType.DATA) {
handleRead(data);
} else {
int offset = -1;
for (int i = 0; data.remaining() > 0; i++) {
byte b = data.get();
if (b == ‘\r‘) {
foundCr = true;
} else if (b == ‘\n‘) {
assert foundCr : "got a \\n without a \\r";
offset = i;
foundCr = false;
break;
} else {
assert !foundCr : "got a \\r without a \\n";
byteBuffer.write(b);
}
}
if (offset >= 0) {
String line = new String(byteBuffer.toByteArray(), CHARSET);
byteBuffer.reset();
OperationErrorType eType = classifyError(line);
if (eType != null) {
errorMsg = line.getBytes();
handleError(eType, line);
} else {
handleLine(line);
}
}
}
}
}
我们可以看到ascii协议其实就是文本协议,数据的读取是根据\r\n来分割的。
读取一行之后先进行校验:
OperationErrorType classifyError(String line) {
OperationErrorType rv = null;
if (line.startsWith("ERROR")) {
rv = OperationErrorType.GENERAL;
} else if (line.startsWith("CLIENT_ERROR")) {
rv = OperationErrorType.CLIENT;
} else if (line.startsWith("SERVER_ERROR")) {
rv = OperationErrorType.SERVER;
}
return rv;
}
判断返回的数据是否正确。如果校验成功,则调用:
public final void handleLine(String line) {
if (line.equals("END")) {
getLogger().debug("Get complete!");
if (hasValue) {
getCallback().receivedStatus(END);
} else {
getCallback().receivedStatus(NOT_FOUND);
}
transitionState(OperationState.COMPLETE);
data = http://www.mamicode.com/null;
} else if (line.startsWith("VALUE ")) {
getLogger().debug("Got line %s", line);
String[] stuff = line.split(" ");
assert stuff[0].equals("VALUE");
currentKey = stuff[1];
currentFlags = Integer.parseInt(stuff[2]);
data = http://www.mamicode.com/new byte[Integer.parseInt(stuff[3])];
if (stuff.length > 4) {
casValue = http://www.mamicode.com/Long.parseLong(stuff[4]);
}
readOffset = 0;
hasValue = http://www.mamicode.com/true;
getLogger().debug("Set read type to data");
setReadType(OperationReadType.DATA);
} else if (line.equals("LOCK_ERROR")) {
getCallback().receivedStatus(LOCK_ERROR);
transitionState(OperationState.COMPLETE);
} else {
assert false : "Unknown line type: " + line;
}
}
line的格式为:VALUE key flags data,data比较长所以后面还得继续读取, setReadType(OperationReadType.DATA);
public final void handleRead(ByteBuffer b) {
assert currentKey != null;
assert data != null;
// This will be the case, because we‘ll clear them when it‘s not.
assert readOffset <= data.length : "readOffset is " + readOffset
+ " data.length is " + data.length;
getLogger().debug("readOffset: %d, length: %d", readOffset, data.length);
// If we‘re not looking for termination, we‘re still looking for data
if (lookingFor == ‘\0‘) {
int toRead = data.length - readOffset;
int available = b.remaining();
toRead = Math.min(toRead, available);
getLogger().debug("Reading %d bytes", toRead);
b.get(data, readOffset, toRead);
readOffset += toRead;
}
// Transition us into a ``looking for \r\n‘‘ kind of state if we‘ve
// read enough and are still in a data state.
if (readOffset == data.length && lookingFor == ‘\0‘) {
// The callback is most likely a get callback. If it‘s not, then
// it‘s a gets callback.
OperationCallback cb = getCallback();
if (cb instanceof GetOperation.Callback) {
GetOperation.Callback gcb = (GetOperation.Callback) cb;
gcb.gotData(currentKey, currentFlags, data);
} else if (cb instanceof GetsOperation.Callback) {
GetsOperation.Callback gcb = (GetsOperation.Callback) cb;
gcb.gotData(currentKey, currentFlags, casValue, data);
} else if (cb instanceof GetlOperation.Callback) {
GetlOperation.Callback gcb = (GetlOperation.Callback) cb;
gcb.gotData(currentKey, currentFlags, casValue, data);
} else if (cb instanceof GetAndTouchOperation.Callback) {
GetAndTouchOperation.Callback gcb = (GetAndTouchOperation.Callback) cb;
gcb.gotData(currentKey, currentFlags, casValue, data);
} else {
throw new ClassCastException("Couldn‘t convert " + cb
+ "to a relevent op");
}
lookingFor = ‘\r‘;
}
// If we‘re looking for an ending byte, let‘s go find it.
if (lookingFor != ‘\0‘ && b.hasRemaining()) {
do {
byte tmp = b.get();
assert tmp == lookingFor : "Expecting " + lookingFor + ", got "
+ (char) tmp;
switch (lookingFor) {
case ‘\r‘:
lookingFor = ‘\n‘;
break;
case ‘\n‘:
lookingFor = ‘\0‘;
break;
default:
assert false : "Looking for unexpected char: " + (char) lookingFor;
}
} while (lookingFor != ‘\0‘ && b.hasRemaining());
// Completed the read, reset stuff.
if (lookingFor == ‘\0‘) {
currentKey = null;
data = http://www.mamicode.com/null;
readOffset = 0;
currentFlags = 0;
getLogger().debug("Setting read type back to line.");
setReadType(OperationReadType.LINE);
}
}
}
然后把data读完之后,调用相应GetlOperation指令callback的gotData的方法。
handleOperationalTasks
private void handleOperationalTasks() throws IOException {
checkPotentiallyTimedOutConnection();
if (!shutDown && !reconnectQueue.isEmpty()) {
attemptReconnects();
}
if (!retryOps.isEmpty()) {
redistributeOperations(new ArrayList<Operation>(retryOps));
retryOps.clear();
}
handleShutdownQueue();
}
最后进行重连和重新负载均衡的操作。
最后我们用两幅时序图来总结IO模型的使用流程:
这里有2个核心的queue需要搞明白他们的作用:
addQueue:每次操作有读写的node将会添加到该队列中
inputQueue:node相关的读写operation都会添加到该队列中
服务器管理
在spymemcached中,一个服务器连接是和MemcachedNode关联的。
我们在使用过程中,如何找到指定的node是由key所决定的。我们来看相应的函数,当我们添加一个operation的时候,memcachedConnection会调用,addOperation方法:
MemcachedNode primary = locator.getPrimary(key);
locator将会选取对应的node:
public MemcachedNode getPrimary(String k) {
return nodes[getServerForKey(k)];
}
private int getServerForKey(String key) {
int rv = (int) (hashAlg.hash(key) % nodes.length);
assert rv >= 0 : "Returned negative key for key " + key;
assert rv < nodes.length : "Invalid server number " + rv + " for key "
+ key;
return rv;
}
假如node挂了,将会进行重新选择。
if (primary.isActive() || failureMode == FailureMode.Retry) {
placeIn = primary;
} else if (failureMode == FailureMode.Cancel) {
o.cancel();
} else {
Iterator<MemcachedNode> i = locator.getSequence(key);
while (placeIn == null && i.hasNext()) {
MemcachedNode n = i.next();
if (n.isActive()) {
placeIn = n;
}
}
序列化
因为存入到memcached的数据都是字节数组,所以上层对象需要进行序列化处理,该功能在SerializingTranscoder中。
public CachedData encode(Object o) {
byte[] b = null;
int flags = 0;
if (o instanceof String) {
b = encodeString((String) o);
if (StringUtils.isJsonObject((String) o)) {
return new CachedData(flags, b, getMaxSize());
}
} else if (o instanceof Long) {
b = tu.encodeLong((Long) o);
flags |= SPECIAL_LONG;
} else if (o instanceof Integer) {
b = tu.encodeInt((Integer) o);
flags |= SPECIAL_INT;
} else if (o instanceof Boolean) {
b = tu.encodeBoolean((Boolean) o);
flags |= SPECIAL_BOOLEAN;
} else if (o instanceof Date) {
b = tu.encodeLong(((Date) o).getTime());
flags |= SPECIAL_DATE;
} else if (o instanceof Byte) {
b = tu.encodeByte((Byte) o);
flags |= SPECIAL_BYTE;
} else if (o instanceof Float) {
b = tu.encodeInt(Float.floatToRawIntBits((Float) o));
flags |= SPECIAL_FLOAT;
} else if (o instanceof Double) {
b = tu.encodeLong(Double.doubleToRawLongBits((Double) o));
flags |= SPECIAL_DOUBLE;
} else if (o instanceof byte[]) {
b = (byte[]) o;
flags |= SPECIAL_BYTEARRAY;
} else {
b = serialize(o);
flags |= SERIALIZED;
}
assert b != null;
if (b.length > compressionThreshold) {
byte[] compressed = compress(b);
if (compressed.length < b.length) {
getLogger().debug("Compressed %s from %d to %d",
o.getClass().getName(), b.length, compressed.length);
b = compressed;
flags |= COMPRESSED;
} else {
getLogger().info("Compression increased the size of %s from %d to %d",
o.getClass().getName(), b.length, compressed.length);
}
}
return new CachedData(flags, b, getMaxSize());
}
然后我们看下对object的serialize处理:
protected byte[] serialize(Object o) {
if (o == null) {
throw new NullPointerException("Can‘t serialize null");
}
byte[] rv=null;
ByteArrayOutputStream bos = null;
ObjectOutputStream os = null;
try {
bos = new ByteArrayOutputStream();
os = new ObjectOutputStream(bos);
os.writeObject(o);
os.close();
bos.close();
rv = bos.toByteArray();
} catch (IOException e) {
throw new IllegalArgumentException("Non-serializable object", e);
} finally {
CloseUtil.close(os);
CloseUtil.close(bos);
}
return rv;
}
set分析
private <T> OperationFuture<Boolean> asyncStore(StoreType storeType,
String key, int exp, T value, Transcoder<T> tc) {
CachedData co = tc.encode(value);
final CountDownLatch latch = new CountDownLatch(1);
final OperationFuture<Boolean> rv =
new OperationFuture<Boolean>(key, latch, operationTimeout,
executorService);
Operation op = opFact.store(storeType, key, co.getFlags(), exp,
co.getData(), new StoreOperation.Callback() {
@Override
public void receivedStatus(OperationStatus val) {
rv.set(val.isSuccess(), val);
}
@Override
public void gotData(String key, long cas) {
rv.setCas(cas);
}
@Override
public void complete() {
latch.countDown();
rv.signalComplete();
}
});
rv.setOperation(op);
mconn.enqueueOperation(key, op);
return rv;
}
set的回调函数在goData的时候获取了cas计数器的值。
我们来看下OperationFuture的getCas的方法:
public Long getCas() {
if (cas == null) {
try {
get();
} catch (InterruptedException e) {
status = new OperationStatus(false, "Interrupted", StatusCode.INTERRUPTED);
Thread.currentThread().isInterrupted();
} catch (ExecutionException e) {
getLogger().warn("Error getting cas of operation", e);
}
}
if (cas == null && status.isSuccess()) {
throw new UnsupportedOperationException("This operation doesn‘t return"
+ "a cas value.");
}
return cas;
}
调用了get方法:
public T get() throws InterruptedException, ExecutionException {
try {
return get(timeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
throw new RuntimeException("Timed out waiting for operation", e);
}
}
public T get(long duration, TimeUnit units) throws InterruptedException,
TimeoutException, ExecutionException {
if (!latch.await(duration, units)) {
// whenever timeout occurs, continuous timeout counter will increase by 1.
MemcachedConnection.opTimedOut(op);
if (op != null) { // op can be null on a flush
op.timeOut();
}
throw new CheckedOperationTimeoutException(
"Timed out waiting for operation", op);
} else {
// continuous timeout counter will be reset
MemcachedConnection.opSucceeded(op);
}
if (op != null && op.hasErrored()) {
throw new ExecutionException(op.getException());
}
if (isCancelled()) {
throw new ExecutionException(new CancellationException("Cancelled"));
}
if (op != null && op.isTimedOut()) {
throw new ExecutionException(new CheckedOperationTimeoutException(
"Operation timed out.", op));
}
/* TODO: re-add assertion that op.getState() == OperationState.COMPLETE */
return objRef.get();
}
这里可以看到能否get到结果是由asyncStore中的complete方法被调用后 latch.countDown();执行后才能获得结果。set最后塞进去的是个boolean值。
public void receivedStatus(OperationStatus val) {
rv.set(val.isSuccess(), val);
}
get分析
@Override
public <T> GetFuture<T> asyncGet(final String key, final Transcoder<T> tc) {
final CountDownLatch latch = new CountDownLatch(1);
final GetFuture<T> rv = new GetFuture<T>(latch, operationTimeout, key,
executorService);
Operation op = opFact.get(key, new GetOperation.Callback() {
private Future<T> val;
@Override
public void receivedStatus(OperationStatus status) {
rv.set(val, status);
}
@Override
public void gotData(String k, int flags, byte[] data) {
assert key.equals(k) : "Wrong key returned";
val =
tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize()));
}
@Override
public void complete() {
latch.countDown();
rv.signalComplete();
}
});
rv.setOperation(op);
mconn.enqueueOperation(key, op);
return rv;
}
get的流程大致和get相似,只是回调receivedStatus塞进去是个future对象。
@Override
public void receivedStatus(OperationStatus status) {
rv.set(val, status);
}
cas分析
由于我们做cas操作的时候会传入get获取的cas计数器值,如果并发的时候大家都传入一样的值,cas就有可能失败:
public <T> OperationFuture<CASResponse>
asyncCAS(String key, long casId, int exp, T value, Transcoder<T> tc) {
CachedData co = tc.encode(value);
final CountDownLatch latch = new CountDownLatch(1);
final OperationFuture<CASResponse> rv =
new OperationFuture<CASResponse>(key, latch, operationTimeout,
executorService);
Operation op = opFact.cas(StoreType.set, key, casId, co.getFlags(), exp,
co.getData(), new StoreOperation.Callback() {
@Override
public void receivedStatus(OperationStatus val) {
if (val instanceof CASOperationStatus) {
rv.set(((CASOperationStatus) val).getCASResponse(), val);
} else if (val instanceof CancelledOperationStatus) {
getLogger().debug("CAS operation cancelled");
} else if (val instanceof TimedOutOperationStatus) {
getLogger().debug("CAS operation timed out");
} else {
throw new RuntimeException("Unhandled state: " + val);
}
}
@Override
public void gotData(String key, long cas) {
rv.setCas(cas);
}
@Override
public void complete() {
latch.countDown();
rv.signalComplete();
}
});
rv.setOperation(op);
mconn.enqueueOperation(key, op);
return rv;
}
receivedStatus回调的CASOperationStatus对象就可以获取cas的结果:
public enum CASResponse {
/**
* Status indicating that the CAS was successful and the new value is stored
* in the cache.
*/
OK,
/**
* Status indicating the value was not found in the cache (an add operation
* may be issued to store the value).
*/
NOT_FOUND,
/**
* Status indicating the value was found in the cache, but exists with a
* different CAS value than expected. In this case, the value must be
* refetched and the CAS operation tried again.
*/
EXISTS,
/**
* Status indicating there was an error in specifying the arguments for
* the Observe.
*/
OBSERVE_ERROR_IN_ARGS,
/**
* Status indicating the CAS operation succeeded but the value was
* subsequently modified during Observe.
*/
OBSERVE_MODIFIED,
/**
* Status indicating there was a Timeout in the Observe operation.
*/
OBSERVE_TIMEOUT;
}
补充文档:
https://code.google.com/p/spymemcached/
http://lingqi1818.iteye.com/blog/1096212