首页 > 代码库 > spymemcached源码分析

spymemcached源码分析

主题

核心概念

IO模型

服务器管理

序列化

set分析

get分析

cas分析

补充文档



核心概念


spymemcached是memcached客户端的开源java实现,我们学习任何一种软件,首先需要从整体上对该软件有个了解,也就是中心领域模型是什么?我们首先来看下它的几个核心概念以及组成关系。


MemcachedClient:对于使用客户端的程序员来讲,直接构造的就是这个对象,用于所有和memcached相关的所有操作提交。我们可以看到client在构建的时候,会初始化和它相关的对象,并且在创建连接的时候启动IO模型。


public MemcachedClient(ConnectionFactory cf, List<InetSocketAddress> addrs)

    throws IOException {

    if (cf == null) {

      throw new NullPointerException("Connection factory required");

    }

    if (addrs == null) {

      throw new NullPointerException("Server list required");

    }

    if (addrs.isEmpty()) {

      throw new IllegalArgumentException("You must have at least one server to"

          + " connect to");

    }

    if (cf.getOperationTimeout() <= 0) {

      throw new IllegalArgumentException("Operation timeout must be positive.");

    }

    connFactory = cf;

    tcService = new TranscodeService(cf.isDaemon());

    transcoder = cf.getDefaultTranscoder();

    opFact = cf.getOperationFactory();

    assert opFact != null : "Connection factory failed to make op factory";

    mconn = cf.createConnection(addrs);

    assert mconn != null : "Connection factory failed to make a connection";

    operationTimeout = cf.getOperationTimeout();

    authDescriptor = cf.getAuthDescriptor();

    executorService = cf.getListenerExecutorService();

    if (authDescriptor != null) {

      addObserver(this);

    }

  }




MemcachedNode:一个node对应一台memcached服务器节点

从图中我们可以看到node主要实现类为TCPMemcachedNodeImpl.针对ascii和binary协议有2个特定实现。


首先我们分析下TCPMemcachedNodeImpl的数据结构:

  private final SocketAddress socketAddress;//对应服务器的地址

  private final ByteBuffer rbuf;//读缓冲区

  private final ByteBuffer wbuf;//写缓冲区

  protected final BlockingQueue<Operation> writeQ;//读指令度列

  private final BlockingQueue<Operation> readQ;//写指令队列

  private final BlockingQueue<Operation> inputQueue;//指令提交队列

  private final long opQueueMaxBlockTime;//操作队列最大阻塞时间

  private final long authWaitTime;//鉴权等待事件

  private final ConnectionFactory connectionFactory;//连接工厂

  private AtomicInteger reconnectAttempt = new AtomicInteger(1);//重连原子计数器

  private SocketChannel channel;//连接对应的channel

  private int toWrite = 0;//写入数据的总字节数

  protected Operation optimizedOp = null;//当前的op

  private volatile SelectionKey sk = null;//注册在主selector上的sk

  private boolean shouldAuth = false;

  private CountDownLatch authLatch;

  private ArrayList<Operation> reconnectBlocked;//重连阻塞队列

  private long defaultOpTimeout;

  private volatile long lastReadTimestamp = System.nanoTime();

  private MemcachedConnection connection;//持有该node的connection


  // operation Future.get timeout counter

  private final AtomicInteger continuousTimeout = new AtomicInteger(0);



接下来分析下该接口的几个主要方法:


/**

 * Interface defining a connection to a memcached server.

 */

public interface MemcachedNode {


  /**

   * Move all of the operations delivered via addOperation into the internal

   * write queue.该方法将提交的operation提交到内部的write队列中

   */

  void copyInputQueue();


  /**

   * Extract all queued items for this node destructively.

   *该方法会将input队列的数据复制出来,同时销毁input队列

   * This is useful for redistributing items.

   */

  Collection<Operation> destroyInputQueue();


  /**该方法将会将队列中的请求全部取消,并且清空

   * Clear the queue of currently processing operations by either cancelling

   * them or setting them up to be reapplied after a reconnect.

   */

  void setupResend();


  /**

   *该方法将operation中的buf数据提交到node的wbuf中

   * Fill the write buffer with data from the next operations in the queue.

   *

   * @param optimizeGets if true, combine sequential gets into a single

   *          multi-key get

   */

  void fillWriteBuffer(boolean optimizeGets);


  /**

   * Transition the current write item into a read state.

   */

  void transitionWriteItem();


  /**

   * Get the operation at the top of the queue that is requiring input.

   */

  Operation getCurrentReadOp();


  /**

   * Remove the operation at the top of the queue that is requiring input.

   */

  Operation removeCurrentReadOp();


  /**

   * Get the operation at the top of the queue that has information available to

   * write.

   */

  Operation getCurrentWriteOp();


  /**

   * Remove the operation at the top of the queue that has information available

   * to write.

   */

  Operation removeCurrentWriteOp();


  /**

   * True if an operation is available to read.

   */

  boolean hasReadOp();


  /**

   * True if an operation is available to write.

   */

  boolean hasWriteOp();


  /**

   * Add an operation to the queue. Authentication operations should never be

   * added to the queue, but this is not checked.

   */

  void addOp(Operation op);


  /**

   * Insert an operation to the beginning of the queue.

   *

   * This method is meant to be invoked rarely.

   */

  void insertOp(Operation o);


  /**

   * Compute the appropriate selection operations for the channel this

   * MemcachedNode holds to the server.

   */

  int getSelectionOps();


  /**

   * Get the buffer used for reading data from this node.

   */

  ByteBuffer getRbuf();


  /**

   * Get the buffer used for writing data to this node.

   */

  ByteBuffer getWbuf();


  /**

   * Get the SocketAddress of the server to which this node is connected.

   */

  SocketAddress getSocketAddress();


  /**

   * True if this node is <q>active.</q> i.e. is is currently connected and

   * expected to be able to process requests

   */

  boolean isActive();


  /**

   * True if this node is <q>authenticated.</q>

   */

  boolean isAuthenticated();


  /**当前时间距离最后一次读取时间的差值

   * Milliseconds since last successful read.

   */

  long lastReadDelta();


  /**

   * Notify node of successful read.

   *

   * This is used so the node can keep track of any internal debugging or

   * state it cares about on read.

   */

  void completedRead();


  /**

   * Notify this node that it will be reconnecting.

   */

  void reconnecting();


  /**

   * Notify this node that it has reconnected.

   */

  void connected();


  /**

   * Get the current reconnect count.

   */

  int getReconnectCount();


  /**

   * Register a channel with this node.

   */

  void registerChannel(SocketChannel ch, SelectionKey selectionKey);


  /**

   * Set the SocketChannel this node uses.

   */

  void setChannel(SocketChannel to);


  /**

   * Get the SocketChannel for this connection.

   */

  SocketChannel getChannel();


  /**设置selectKey. 

    *ch.register(selector, ops, qa)为初始构建

   * Set the selection key for this node.

   */

  void setSk(SelectionKey to);


  /**

   * Get the selection key from this node.

   */

  SelectionKey getSk();


  /**

   * Get the number of bytes remaining to write.

   */

  int getBytesRemainingToWrite();


  /**

   * Write some bytes and return the number of bytes written.

   *将数据写到node对应的channel中

   * @return the number of bytes written

   * @throws IOException if there‘s a problem writing

   */

  int writeSome() throws IOException;


  /**设置selectKey感兴趣的事件,具体看下实现代码就清楚了

   *

   * Fix up the selection ops on the selection key.

   */

  void fixupOps();


  /**

   * Let the node know that auth is complete. Typically this would mean the node

   * can start processing and accept new operations to its input queue.

   */

  void authComplete();


  /**

   * Tell a node to set up for authentication. Typically this would mean

   * blocking additions to the queue. In a reconnect situation this may mean

   * putting any queued operations on hold to get to an auth complete state.

   */

  void setupForAuth();


  /**

   * Count ‘time out‘ exceptions to drop connections that fail perpetually.

   *

   * @param timedOut

   */

  void setContinuousTimeout(boolean timedOut);


  int getContinuousTimeout();


  MemcachedConnection getConnection();


  void setConnection(MemcachedConnection connection);




MemcachedConnection:一个memcached客户端只有一个connection实例,一个connection包含一组MemcachedNode列表。我们来看下connection的核心数据结构:

/**

   * The number of empty selects we‘ll allow before assuming we may have

   * missed one and should check the current selectors. This generally

   * indicates a bug, but we‘ll check it nonetheless.

   */

  private static final int DOUBLE_CHECK_EMPTY = 256;


  /**

   * The number of empty selects we‘ll allow before blowing up. It‘s too

   * easy to write a bug that causes it to loop uncontrollably. This helps

   * find those bugs and often works around them.

   */

  private static final int EXCESSIVE_EMPTY = 0x1000000;


  /**

   * The default wakeup delay if not overriden by a system property.

   */

  private static final int DEFAULT_WAKEUP_DELAY = 1000;


  /**

   * If an operation gets cloned more than this ceiling, cancel it for

   * safety reasons.

   */

  private static final int MAX_CLONE_COUNT = 100;


  /**

   * If the connection is alread shut down or shutting down.

   */

  protected volatile boolean shutDown = false;


  /**

   * If true, optimization will collapse multiple sequential get ops.

   */

  private final boolean shouldOptimize;


  /**

   * Holds the current {@link Selector} to use.

   */

  protected Selector selector = null;


  /**

   * The {@link NodeLocator} to use for this connection.

   */

  protected final NodeLocator locator;


  /**

   * The configured {@link FailureMode}.

   */

  protected final FailureMode failureMode;


  /**

   * Maximum amount of time to wait between reconnect attempts.

   */

  private final long maxDelay;


  /**

   * Contains the current number of empty select() calls, which could indicate

   * bugs.

   */

  private int emptySelects = 0;


  /**

   * The buffer size that will be used when reading from the server.

   */

  private final int bufSize;


  /**

   * The connection factory to create {@link MemcachedNode}s from.

   */

  private final ConnectionFactory connectionFactory;


  /**

   * AddedQueue is used to track the QueueAttachments for which operations

   * have recently been queued.

   */

  protected final ConcurrentLinkedQueue<MemcachedNode> addedQueue;


  /**

   * reconnectQueue contains the attachments that need to be reconnected.

   * The key is the time at which they are eligible for reconnect.

   */

  private final SortedMap<Long, MemcachedNode> reconnectQueue;


  /**

   * True if not shutting down or shut down.

   */

  protected volatile boolean running = true;


  /**

   * Holds all connection observers that get notified on connection status

   * changes.

   */

  private final Collection<ConnectionObserver> connObservers =

    new ConcurrentLinkedQueue<ConnectionObserver>();


  /**

   * The {@link OperationFactory} to clone or create operations.

   */

  private final OperationFactory opFact;


  /**

   * The threshold for timeout exceptions.

   */

  private final int timeoutExceptionThreshold;


  /**

   * Holds operations that need to be retried.

   */

  private final List<Operation> retryOps;


  /**

   * Holds all nodes that are scheduled for shutdown.

   */

  protected final ConcurrentLinkedQueue<MemcachedNode> nodesToShutdown;


  /**

   * If set to true, a proper check after finish connecting is done to see

   * if the node is not responding but really alive.

   */

  private final boolean verifyAliveOnConnect;


  /**

   * The {@link ExecutorService} to use for callbacks.

   */

  private final ExecutorService listenerExecutorService;


  /**

   * The {@link MetricCollector} to accumulate metrics (or dummy).

   */

  protected final MetricCollector metrics;


  /**

   * The current type of metrics to collect.

   */

  protected final MetricType metricType;


  /**

   * The selector wakeup delay, defaults to 1000ms.

   */

  private final int wakeupDelay;





SerializingTranscoder:memcached做为一个KV服务器,需要将保存的value对象序列化成字节数组,SerializingTranscoder就是如何对不同的对象类型进行转换的工作。

OperationFactory:用于创在不同类型的Operation对象,因为memcached有两种协议ascii和binary,默认采用的是ascii协议,我们这里仅针对ascii协议进行分析,针对ascii协议具体的实现类为:AsciiOperationFactory.

Operation:操作memcached的一个操作。也分为两组ascii和binary.

OperationCallback:操作成功之后的回调,由于spymemcached采用了异步IO的提交模式,所以取回结果需要有callback的实现。


他们之间的关系如下: 


IO模型


spymemcached采用了单线程异步NIO的方式来管理所有的请求和连接。


在IO模型的启动方式如下:构建MemcachedClient->createConnection->start();


MemcachedClient继承了Thread,start方法其实是启动了一个线程。我们再看该线程的run方法:

public void run() {

    while (running) {

      try {

        handleIO();

      } catch (IOException e) {

        logRunException(e);

      } catch (CancelledKeyException e) {

        logRunException(e);

      } catch (ClosedSelectorException e) {

        logRunException(e);

      } catch (IllegalStateException e) {

        logRunException(e);

      } catch (ConcurrentModificationException e) {

        logRunException(e);

      }

    }

    getLogger().info("Shut down memcached client");

  }

所以IO的核心函数为handleIO,我们再来看该方法:

public void handleIO() throws IOException {

    if (shutDown) {

      getLogger().debug("No IO while shut down.");

      return;

    }


    handleInputQueue();

    getLogger().debug("Done dealing with queue.");


    long delay = 1000;

    if (!reconnectQueue.isEmpty()) {

      long now = System.currentTimeMillis();

      long then = reconnectQueue.firstKey();

      delay = Math.max(then - now, 1);

    }

    getLogger().debug("Selecting with delay of %sms", delay);

    assert selectorsMakeSense() : "Selectors don‘t make sense.";

    int selected = selector.select(delay);


    if (shutDown) {

      return;

    } else if (selected == 0 && addedQueue.isEmpty()) {

      handleWokenUpSelector();

    } else if (selector.selectedKeys().isEmpty()) {

      handleEmptySelects();

    } else {

      getLogger().debug("Selected %d, selected %d keys", selected,

        selector.selectedKeys().size());

      emptySelects = 0;


      Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

      while(iterator.hasNext()) {

        SelectionKey sk = iterator.next();

        handleIO(sk);

        iterator.remove();

      }

    }


    handleOperationalTasks();

  }

这个方法的主要流程有几个过程:

handleInputQueue



 private void handleInputQueue() {

    if (!addedQueue.isEmpty()) {

      getLogger().debug("Handling queue");

      Collection<MemcachedNode> toAdd = new HashSet<MemcachedNode>();

      Collection<MemcachedNode> todo = new HashSet<MemcachedNode>();


      MemcachedNode qaNode;

      while ((qaNode = addedQueue.poll()) != null) {

        todo.add(qaNode);

      }


      for (MemcachedNode node : todo) {

        boolean readyForIO = false;

        if (node.isActive()) {

          if (node.getCurrentWriteOp() != null) {

            readyForIO = true;

            getLogger().debug("Handling queued write %s", node);

          }

        } else {

          toAdd.add(node);

        }

        node.copyInputQueue();

        if (readyForIO) {

          try {

            if (node.getWbuf().hasRemaining()) {

              handleWrites(node);

            }

          } catch (IOException e) {

            getLogger().warn("Exception handling write", e);

            lostConnection(node);

          }

        }

        node.fixupOps();

      }

      addedQueue.addAll(toAdd);

    }

  }


该方法把addedQueue队里中的就绪的node取出来,假如node中已经提交了写请求,则处理之,否则把node重新放回addedQueue队列中。



private void handleIO(final SelectionKey sk)


 private void handleIO(final SelectionKey sk) {

    MemcachedNode node = (MemcachedNode) sk.attachment();


    try {

      getLogger().debug("Handling IO for:  %s (r=%s, w=%s, c=%s, op=%s)", sk,

        sk.isReadable(), sk.isWritable(), sk.isConnectable(),

        sk.attachment());

      if (sk.isConnectable() && belongsToCluster(node)) {

        getLogger().debug("Connection state changed for %s", sk);

        final SocketChannel channel = node.getChannel();

        if (channel.finishConnect()) {

          finishConnect(sk, node);

        } else {

          assert !channel.isConnected() : "connected";

        }

      } else {

        handleReadsAndWrites(sk, node);

      }

    } catch (ClosedChannelException e) {

      if (!shutDown) {

        getLogger().info("Closed channel and not shutting down. Queueing"

            + " reconnect on %s", node, e);

        lostConnection(node);

      }

    } catch (ConnectException e) {

      getLogger().info("Reconnecting due to failure to connect to %s", node, e);

      queueReconnect(node);

    } catch (OperationException e) {

      node.setupForAuth();

      getLogger().info("Reconnection due to exception handling a memcached "

        + "operation on %s. This may be due to an authentication failure.",

        node, e);

      lostConnection(node);

    } catch (Exception e) {

      node.setupForAuth();

      getLogger().info("Reconnecting due to exception on %s", node, e);

      lostConnection(node);

    }

    node.fixupOps();

  }


该方法根据sk目前的状态来决定如何处理事件,例如sk.isConnectable也就是连接刚建立,那么就会调用finishConnect方法,否则则直接调用handleReadsAndWrites


private void finishConnect(final SelectionKey sk, final MemcachedNode node)

    throws IOException {

    if (verifyAliveOnConnect) {

      final CountDownLatch latch = new CountDownLatch(1);

      final OperationFuture<Boolean> rv = new OperationFuture<Boolean>("noop",

        latch, 2500, listenerExecutorService);

      NoopOperation testOp = opFact.noop(new OperationCallback() {

        public void receivedStatus(OperationStatus status) {

          rv.set(status.isSuccess(), status);

        }


        @Override

        public void complete() {

          latch.countDown();

        }

      });


      testOp.setHandlingNode(node);

      testOp.initialize();

      checkState();

      insertOperation(node, testOp);

      node.copyInputQueue();


      boolean done = false;

      if (sk.isValid()) {

        long timeout = TimeUnit.MILLISECONDS.toNanos(

          connectionFactory.getOperationTimeout());


        long stop = System.nanoTime() + timeout;

        while (stop > System.nanoTime()) {

          handleWrites(node);

          handleReads(node);

          if(done = (latch.getCount() == 0)) {

            break;

          }

        }

      }


      if (!done || testOp.isCancelled() || testOp.hasErrored()

        || testOp.isTimedOut()) {

        throw new ConnectException("Could not send noop upon connect! "

          + "This may indicate a running, but not responding memcached "

          + "instance.");

      }

    }


    connected(node);

    addedQueue.offer(node);

    if (node.getWbuf().hasRemaining()) {

      handleWrites(node);

    }

  }


finishConnect方法构建了一条NoopOperation,实际上是一条VersionOperationImpl指令来和服务器通信,假如服务器没响应或者超时,那么就抛异常。如果成功,则将重连的计数器清空。


handleWrites


 private void handleWrites(final MemcachedNode node) throws IOException {

    node.fillWriteBuffer(shouldOptimize);

    boolean canWriteMore = node.getBytesRemainingToWrite() > 0;

    while (canWriteMore) {

      int wrote = node.writeSome();

      metrics.updateHistogram(OVERALL_AVG_BYTES_WRITE_METRIC, wrote);

      node.fillWriteBuffer(shouldOptimize);

      canWriteMore = wrote > 0 && node.getBytesRemainingToWrite() > 0;

    }

  }


这个该方法比较简单,首先node将数据拷贝到wbuf中,然后将wbuf写入到channel中。



handleReads



该函数比较复杂

private void handleReads(final MemcachedNode node) throws IOException {

    Operation currentOp = node.getCurrentReadOp();

    if (currentOp instanceof TapAckOperationImpl) {

      node.removeCurrentReadOp();

      return;

    }


    ByteBuffer rbuf = node.getRbuf();

    final SocketChannel channel = node.getChannel();

    int read = channel.read(rbuf);

    metrics.updateHistogram(OVERALL_AVG_BYTES_READ_METRIC, read);

    if (read < 0) {

      currentOp = handleReadsWhenChannelEndOfStream(currentOp, node, rbuf);

    }


    while (read > 0) {

      getLogger().debug("Read %d bytes", read);

      rbuf.flip();

      while (rbuf.remaining() > 0) {

        if (currentOp == null) {

          throw new IllegalStateException("No read operation.");

        }


        long timeOnWire =

          System.nanoTime() - currentOp.getWriteCompleteTimestamp();

        metrics.updateHistogram(OVERALL_AVG_TIME_ON_WIRE_METRIC,

          (int)(timeOnWire / 1000));

        metrics.markMeter(OVERALL_RESPONSE_METRIC);

        synchronized(currentOp) {

          readBufferAndLogMetrics(currentOp, rbuf, node);

        }


        currentOp = node.getCurrentReadOp();

      }

      rbuf.clear();

      read = channel.read(rbuf);

      node.completedRead();

    }

  }


假如没有数据可读,则调用:

private Operation handleReadsWhenChannelEndOfStream(final Operation currentOp,

    final MemcachedNode node, final ByteBuffer rbuf) throws IOException {

    if (currentOp instanceof TapOperation) {

      currentOp.getCallback().complete();

      ((TapOperation) currentOp).streamClosed(OperationState.COMPLETE);


      getLogger().debug("Completed read op: %s and giving the next %d bytes",

        currentOp, rbuf.remaining());

      Operation op = node.removeCurrentReadOp();

      assert op == currentOp : "Expected to pop " + currentOp + " got " + op;

      return node.getCurrentReadOp();

    } else {

      throw new IOException("Disconnected unexpected, will reconnect.");

    }

  }


该方法调用operation回调函数的complete方法。


假如有数据可读,则调用readBufferAndLogMetrics方法读取:

private void readBufferAndLogMetrics(final Operation currentOp,

    final ByteBuffer rbuf, final MemcachedNode node) throws IOException {

    currentOp.readFromBuffer(rbuf);

    if (currentOp.getState() == OperationState.COMPLETE) {

      getLogger().debug("Completed read op: %s and giving the next %d "

        + "bytes", currentOp, rbuf.remaining());

      Operation op = node.removeCurrentReadOp();

      assert op == currentOp : "Expected to pop " + currentOp + " got "

        + op;


      if (op.hasErrored()) {

        metrics.markMeter(OVERALL_RESPONSE_FAIL_METRIC);

      } else {

        metrics.markMeter(OVERALL_RESPONSE_SUCC_METRIC);

      }

    } else if (currentOp.getState() == OperationState.RETRY) {

      handleRetryInformation(currentOp.getErrorMsg());

      getLogger().debug("Reschedule read op due to NOT_MY_VBUCKET error: "

        + "%s ", currentOp);

      ((VBucketAware) currentOp).addNotMyVbucketNode(

        currentOp.getHandlingNode());

      Operation op = node.removeCurrentReadOp();

      assert op == currentOp : "Expected to pop " + currentOp + " got "

        + op;


      retryOps.add(currentOp);

      metrics.markMeter(OVERALL_RESPONSE_RETRY_METRIC);

    }

  }


该方法最核心的一句为:currentOp.readFromBuffer(rbuf);


我们来看下该函数的实现:

public void readFromBuffer(ByteBuffer data) throws IOException {

    // Loop while there‘s data remaining to get it all drained.

    while (getState() != OperationState.COMPLETE && data.remaining() > 0) {

      if (readType == OperationReadType.DATA) {

        handleRead(data);

      } else {

        int offset = -1;

        for (int i = 0; data.remaining() > 0; i++) {

          byte b = data.get();

          if (b == ‘\r‘) {

            foundCr = true;

          } else if (b == ‘\n‘) {

            assert foundCr : "got a \\n without a \\r";

            offset = i;

            foundCr = false;

            break;

          } else {

            assert !foundCr : "got a \\r without a \\n";

            byteBuffer.write(b);

          }

        }

        if (offset >= 0) {

          String line = new String(byteBuffer.toByteArray(), CHARSET);

          byteBuffer.reset();

          OperationErrorType eType = classifyError(line);

          if (eType != null) {

            errorMsg = line.getBytes();

            handleError(eType, line);

          } else {

            handleLine(line);

          }

        }

      }

    }

  }


我们可以看到ascii协议其实就是文本协议,数据的读取是根据\r\n来分割的。

读取一行之后先进行校验:

OperationErrorType classifyError(String line) {

    OperationErrorType rv = null;

    if (line.startsWith("ERROR")) {

      rv = OperationErrorType.GENERAL;

    } else if (line.startsWith("CLIENT_ERROR")) {

      rv = OperationErrorType.CLIENT;

    } else if (line.startsWith("SERVER_ERROR")) {

      rv = OperationErrorType.SERVER;

    }

    return rv;

  }

判断返回的数据是否正确。如果校验成功,则调用:

public final void handleLine(String line) {

    if (line.equals("END")) {

      getLogger().debug("Get complete!");

      if (hasValue) {

        getCallback().receivedStatus(END);

      } else {

        getCallback().receivedStatus(NOT_FOUND);

      }

      transitionState(OperationState.COMPLETE);

      data = http://www.mamicode.com/null;

    } else if (line.startsWith("VALUE ")) {

      getLogger().debug("Got line %s", line);

      String[] stuff = line.split(" ");

      assert stuff[0].equals("VALUE");

      currentKey = stuff[1];

      currentFlags = Integer.parseInt(stuff[2]);

      data = http://www.mamicode.com/new byte[Integer.parseInt(stuff[3])];

      if (stuff.length > 4) {

        casValue = http://www.mamicode.com/Long.parseLong(stuff[4]);

      }

      readOffset = 0;

      hasValue = http://www.mamicode.com/true;

      getLogger().debug("Set read type to data");

      setReadType(OperationReadType.DATA);

    } else if (line.equals("LOCK_ERROR")) {

      getCallback().receivedStatus(LOCK_ERROR);

      transitionState(OperationState.COMPLETE);

    } else {

      assert false : "Unknown line type: " + line;

    }

  }


line的格式为:VALUE key flags data,data比较长所以后面还得继续读取,      setReadType(OperationReadType.DATA);


public final void handleRead(ByteBuffer b) {

    assert currentKey != null;

    assert data != null;

    // This will be the case, because we‘ll clear them when it‘s not.

    assert readOffset <= data.length : "readOffset is " + readOffset

        + " data.length is " + data.length;


    getLogger().debug("readOffset: %d, length: %d", readOffset, data.length);

    // If we‘re not looking for termination, we‘re still looking for data

    if (lookingFor == ‘\0‘) {

      int toRead = data.length - readOffset;

      int available = b.remaining();

      toRead = Math.min(toRead, available);

      getLogger().debug("Reading %d bytes", toRead);

      b.get(data, readOffset, toRead);

      readOffset += toRead;

    }

    // Transition us into a ``looking for \r\n‘‘ kind of state if we‘ve

    // read enough and are still in a data state.

    if (readOffset == data.length && lookingFor == ‘\0‘) {

      // The callback is most likely a get callback. If it‘s not, then

      // it‘s a gets callback.

      OperationCallback cb = getCallback();

      if (cb instanceof GetOperation.Callback) {

        GetOperation.Callback gcb = (GetOperation.Callback) cb;

        gcb.gotData(currentKey, currentFlags, data);

      } else if (cb instanceof GetsOperation.Callback) {

        GetsOperation.Callback gcb = (GetsOperation.Callback) cb;

        gcb.gotData(currentKey, currentFlags, casValue, data);

      } else if (cb instanceof GetlOperation.Callback) {

        GetlOperation.Callback gcb = (GetlOperation.Callback) cb;

        gcb.gotData(currentKey, currentFlags, casValue, data);

      } else if (cb instanceof GetAndTouchOperation.Callback) {

        GetAndTouchOperation.Callback gcb = (GetAndTouchOperation.Callback) cb;

        gcb.gotData(currentKey, currentFlags, casValue, data);

      } else {

        throw new ClassCastException("Couldn‘t convert " + cb

            + "to a relevent op");

      }

      lookingFor = ‘\r‘;

    }

    // If we‘re looking for an ending byte, let‘s go find it.

    if (lookingFor != ‘\0‘ && b.hasRemaining()) {

      do {

        byte tmp = b.get();

        assert tmp == lookingFor : "Expecting " + lookingFor + ", got "

            + (char) tmp;

        switch (lookingFor) {

        case ‘\r‘:

          lookingFor = ‘\n‘;

          break;

        case ‘\n‘:

          lookingFor = ‘\0‘;

          break;

        default:

          assert false : "Looking for unexpected char: " + (char) lookingFor;

        }

      } while (lookingFor != ‘\0‘ && b.hasRemaining());

      // Completed the read, reset stuff.

      if (lookingFor == ‘\0‘) {

        currentKey = null;

        data = http://www.mamicode.com/null;

        readOffset = 0;

        currentFlags = 0;

        getLogger().debug("Setting read type back to line.");

        setReadType(OperationReadType.LINE);

      }

    }

  }


然后把data读完之后,调用相应GetlOperation指令callback的gotData的方法。




handleOperationalTasks


private void handleOperationalTasks() throws IOException {

    checkPotentiallyTimedOutConnection();


    if (!shutDown && !reconnectQueue.isEmpty()) {

      attemptReconnects();

    }


    if (!retryOps.isEmpty()) {

      redistributeOperations(new ArrayList<Operation>(retryOps));

      retryOps.clear();

    }


    handleShutdownQueue();

  }



最后进行重连和重新负载均衡的操作。




最后我们用两幅时序图来总结IO模型的使用流程:






这里有2个核心的queue需要搞明白他们的作用:

addQueue:每次操作有读写的node将会添加到该队列中

inputQueue:node相关的读写operation都会添加到该队列中





服务器管理



在spymemcached中,一个服务器连接是和MemcachedNode关联的。


我们在使用过程中,如何找到指定的node是由key所决定的。我们来看相应的函数,当我们添加一个operation的时候,memcachedConnection会调用,addOperation方法:

MemcachedNode primary = locator.getPrimary(key);

locator将会选取对应的node:

public MemcachedNode getPrimary(String k) {

    return nodes[getServerForKey(k)];

  }


private int getServerForKey(String key) {

    int rv = (int) (hashAlg.hash(key) % nodes.length);

    assert rv >= 0 : "Returned negative key for key " + key;

    assert rv < nodes.length : "Invalid server number " + rv + " for key "

        + key;

    return rv;

  }



假如node挂了,将会进行重新选择。


if (primary.isActive() || failureMode == FailureMode.Retry) {

      placeIn = primary;

    } else if (failureMode == FailureMode.Cancel) {

      o.cancel();

    } else {

      Iterator<MemcachedNode> i = locator.getSequence(key);

      while (placeIn == null && i.hasNext()) {

        MemcachedNode n = i.next();

        if (n.isActive()) {

          placeIn = n;

        }

      }






序列化


因为存入到memcached的数据都是字节数组,所以上层对象需要进行序列化处理,该功能在SerializingTranscoder中。


public CachedData encode(Object o) {

    byte[] b = null;

    int flags = 0;

    if (o instanceof String) {

      b = encodeString((String) o);

      if (StringUtils.isJsonObject((String) o)) {

        return new CachedData(flags, b, getMaxSize());

      }

    } else if (o instanceof Long) {

      b = tu.encodeLong((Long) o);

      flags |= SPECIAL_LONG;

    } else if (o instanceof Integer) {

      b = tu.encodeInt((Integer) o);

      flags |= SPECIAL_INT;

    } else if (o instanceof Boolean) {

      b = tu.encodeBoolean((Boolean) o);

      flags |= SPECIAL_BOOLEAN;

    } else if (o instanceof Date) {

      b = tu.encodeLong(((Date) o).getTime());

      flags |= SPECIAL_DATE;

    } else if (o instanceof Byte) {

      b = tu.encodeByte((Byte) o);

      flags |= SPECIAL_BYTE;

    } else if (o instanceof Float) {

      b = tu.encodeInt(Float.floatToRawIntBits((Float) o));

      flags |= SPECIAL_FLOAT;

    } else if (o instanceof Double) {

      b = tu.encodeLong(Double.doubleToRawLongBits((Double) o));

      flags |= SPECIAL_DOUBLE;

    } else if (o instanceof byte[]) {

      b = (byte[]) o;

      flags |= SPECIAL_BYTEARRAY;

    } else {

      b = serialize(o);

      flags |= SERIALIZED;

    }

    assert b != null;

    if (b.length > compressionThreshold) {

      byte[] compressed = compress(b);

      if (compressed.length < b.length) {

        getLogger().debug("Compressed %s from %d to %d",

            o.getClass().getName(), b.length, compressed.length);

        b = compressed;

        flags |= COMPRESSED;

      } else {

        getLogger().info("Compression increased the size of %s from %d to %d",

            o.getClass().getName(), b.length, compressed.length);

      }

    }

    return new CachedData(flags, b, getMaxSize());

  }


然后我们看下对object的serialize处理:

 protected byte[] serialize(Object o) {

    if (o == null) {

      throw new NullPointerException("Can‘t serialize null");

    }

    byte[] rv=null;

    ByteArrayOutputStream bos = null;

    ObjectOutputStream os = null;

    try {

      bos = new ByteArrayOutputStream();

      os = new ObjectOutputStream(bos);

      os.writeObject(o);

      os.close();

      bos.close();

      rv = bos.toByteArray();

    } catch (IOException e) {

      throw new IllegalArgumentException("Non-serializable object", e);

    } finally {

      CloseUtil.close(os);

      CloseUtil.close(bos);

    }

    return rv;

  }











set分析


private <T> OperationFuture<Boolean> asyncStore(StoreType storeType,

      String key, int exp, T value, Transcoder<T> tc) {

    CachedData co = tc.encode(value);

    final CountDownLatch latch = new CountDownLatch(1);

    final OperationFuture<Boolean> rv =

      new OperationFuture<Boolean>(key, latch, operationTimeout,

      executorService);

    Operation op = opFact.store(storeType, key, co.getFlags(), exp,

        co.getData(), new StoreOperation.Callback() {

            @Override

            public void receivedStatus(OperationStatus val) {

              rv.set(val.isSuccess(), val);

            }

            @Override

            public void gotData(String key, long cas) {

              rv.setCas(cas);

            }


            @Override

            public void complete() {

              latch.countDown();

              rv.signalComplete();

            }

          });

    rv.setOperation(op);

    mconn.enqueueOperation(key, op);

    return rv;

  }



set的回调函数在goData的时候获取了cas计数器的值。

我们来看下OperationFuture的getCas的方法:


public Long getCas() {

    if (cas == null) {

      try {

        get();

      } catch (InterruptedException e) {

        status = new OperationStatus(false, "Interrupted", StatusCode.INTERRUPTED);

        Thread.currentThread().isInterrupted();

      } catch (ExecutionException e) {

        getLogger().warn("Error getting cas of operation", e);

      }

    }

    if (cas == null && status.isSuccess()) {

      throw new UnsupportedOperationException("This operation doesn‘t return"

          + "a cas value.");

    }

    return cas;

  }


调用了get方法:

public T get() throws InterruptedException, ExecutionException {

    try {

      return get(timeout, TimeUnit.MILLISECONDS);

    } catch (TimeoutException e) {

      throw new RuntimeException("Timed out waiting for operation", e);

    }

  }



 public T get(long duration, TimeUnit units) throws InterruptedException,

      TimeoutException, ExecutionException {

    if (!latch.await(duration, units)) {

      // whenever timeout occurs, continuous timeout counter will increase by 1.

      MemcachedConnection.opTimedOut(op);

      if (op != null) { // op can be null on a flush

        op.timeOut();

      }

      throw new CheckedOperationTimeoutException(

          "Timed out waiting for operation", op);

    } else {

      // continuous timeout counter will be reset

      MemcachedConnection.opSucceeded(op);

    }

    if (op != null && op.hasErrored()) {

      throw new ExecutionException(op.getException());

    }

    if (isCancelled()) {

      throw new ExecutionException(new CancellationException("Cancelled"));

    }

    if (op != null && op.isTimedOut()) {

      throw new ExecutionException(new CheckedOperationTimeoutException(

          "Operation timed out.", op));

    }


    /* TODO: re-add assertion that op.getState() == OperationState.COMPLETE */


    return objRef.get();

  }

这里可以看到能否get到结果是由asyncStore中的complete方法被调用后              latch.countDown();执行后才能获得结果。set最后塞进去的是个boolean值。

public void receivedStatus(OperationStatus val) {

              rv.set(val.isSuccess(), val);

            }



get分析


@Override

  public <T> GetFuture<T> asyncGet(final String key, final Transcoder<T> tc) {


    final CountDownLatch latch = new CountDownLatch(1);

    final GetFuture<T> rv = new GetFuture<T>(latch, operationTimeout, key,

      executorService);

    Operation op = opFact.get(key, new GetOperation.Callback() {

      private Future<T> val;


      @Override

      public void receivedStatus(OperationStatus status) {

        rv.set(val, status);

      }


      @Override

      public void gotData(String k, int flags, byte[] data) {

        assert key.equals(k) : "Wrong key returned";

        val =

            tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize()));

      }


      @Override

      public void complete() {

        latch.countDown();

        rv.signalComplete();

      }

    });

    rv.setOperation(op);

    mconn.enqueueOperation(key, op);

    return rv;

  }


get的流程大致和get相似,只是回调receivedStatus塞进去是个future对象。

  @Override

      public void receivedStatus(OperationStatus status) {

        rv.set(val, status);

      }



cas分析


由于我们做cas操作的时候会传入get获取的cas计数器值,如果并发的时候大家都传入一样的值,cas就有可能失败:

public <T> OperationFuture<CASResponse>

  asyncCAS(String key, long casId, int exp, T value, Transcoder<T> tc) {

    CachedData co = tc.encode(value);

    final CountDownLatch latch = new CountDownLatch(1);

    final OperationFuture<CASResponse> rv =

      new OperationFuture<CASResponse>(key, latch, operationTimeout,

      executorService);

    Operation op = opFact.cas(StoreType.set, key, casId, co.getFlags(), exp,

        co.getData(), new StoreOperation.Callback() {

            @Override

            public void receivedStatus(OperationStatus val) {

              if (val instanceof CASOperationStatus) {

                rv.set(((CASOperationStatus) val).getCASResponse(), val);

              } else if (val instanceof CancelledOperationStatus) {

                getLogger().debug("CAS operation cancelled");

              } else if (val instanceof TimedOutOperationStatus) {

                getLogger().debug("CAS operation timed out");

              } else {

                throw new RuntimeException("Unhandled state: " + val);

              }

            }

            @Override

            public void gotData(String key, long cas) {

              rv.setCas(cas);

            }

            @Override

            public void complete() {

              latch.countDown();

              rv.signalComplete();

            }

          });

    rv.setOperation(op);

    mconn.enqueueOperation(key, op);

    return rv;

  }


receivedStatus回调的CASOperationStatus对象就可以获取cas的结果:

public enum CASResponse {

  /**

   * Status indicating that the CAS was successful and the new value is stored

   * in the cache.

   */

  OK,

  /**

   * Status indicating the value was not found in the cache (an add operation

   * may be issued to store the value).

   */

  NOT_FOUND,

  /**

   * Status indicating the value was found in the cache, but exists with a

   * different CAS value than expected. In this case, the value must be

   * refetched and the CAS operation tried again.

   */

  EXISTS,

  /**

   * Status indicating there was an error in specifying the arguments for

   * the Observe.

   */

  OBSERVE_ERROR_IN_ARGS,

  /**

   * Status indicating the CAS operation succeeded but the value was

   * subsequently modified during Observe.

   */

  OBSERVE_MODIFIED,

  /**

   * Status indicating there was a Timeout in the Observe operation.

   */

  OBSERVE_TIMEOUT;

}



补充文档:

https://code.google.com/p/spymemcached/

http://lingqi1818.iteye.com/blog/1096212