首页 > 代码库 > Copycat - command

Copycat - command

client.submit(new PutCommand("foo", "Hello world!"));

 

ServerContext

connection.handler(CommandRequest.class, request -> state.command(request));

 

State.command

ReserveState开始,会把command forward到leader,只有leader可以处理command

@Override  public CompletableFuture<CommandResponse> command(CommandRequest request) {    context.checkThread();    logRequest(request);    if (context.getLeader() == null) {      return CompletableFuture.completedFuture(logResponse(CommandResponse.builder()        .withStatus(Response.Status.ERROR)        .withError(CopycatError.Type.NO_LEADER_ERROR)        .build()));    } else {      return this.<CommandRequest, CommandResponse>forward(request)        .exceptionally(error -> CommandResponse.builder()          .withStatus(Response.Status.ERROR)          .withError(CopycatError.Type.NO_LEADER_ERROR)          .build())        .thenApply(this::logResponse);    }  }

 

LeaderState.Command

public CompletableFuture<CommandResponse> command(final CommandRequest request) {    context.checkThread();    logRequest(request);    // Get the client‘s server session. If the session doesn‘t exist, return an unknown session error.    ServerSessionContext session = context.getStateMachine().executor().context().sessions().getSession(request.session());    if (session == null) { //如果session不存在,无法处理该command      return CompletableFuture.completedFuture(logResponse(CommandResponse.builder()        .withStatus(Response.Status.ERROR)        .withError(CopycatError.Type.UNKNOWN_SESSION_ERROR)        .build()));    }    ComposableFuture<CommandResponse> future = new ComposableFuture<>();    sequenceCommand(request, session, future);    return future;  }

sequenceCommand

/**   * Sequences the given command to the log.   */  private void sequenceCommand(CommandRequest request, ServerSessionContext session, CompletableFuture<CommandResponse> future) {    // If the command is LINEARIZABLE and the session‘s current sequence number is less then one prior to the request    // sequence number, queue this request for handling later. We want to handle command requests in the order in which    // they were sent by the client. Note that it‘s possible for the session sequence number to be greater than the request    // sequence number. In that case, it‘s likely that the command was submitted more than once to the    // cluster, and the command will be deduplicated once applied to the state machine.    if (request.sequence() > session.nextRequestSequence()) { //session中的request需要按sequence执行,所以如果request的sequence num大于我们期望的,说明这个request需要等之前的request先执行      // If the request sequence number is more than 1k requests above the last sequenced request, reject the request.      // The client should resubmit a request that fails with a COMMAND_ERROR.      if (request.sequence() - session.getRequestSequence() > MAX_REQUEST_QUEUE_SIZE) { //如果request的sequence大的太多,和当前sequence比,大100以上        future.complete(CommandResponse.builder()          .withStatus(Response.Status.ERROR)          .withError(CopycatError.Type.COMMAND_ERROR) //拒绝该command          .build());      }      // Register the request in the request queue if it‘s not too far ahead of the current sequence number.      else {        session.registerRequest(request.sequence(), () -> applyCommand(request, session, future)); //放入queue等待      }    } else {      applyCommand(request, session, future); //apply该command    }  }

如果command的request比期望的大,

session.registerRequest

ServerSessionContext

  ServerSessionContext registerRequest(long sequence, Runnable runnable) {    commands.put(sequence, runnable);    return this;  }

可以看到会把sequence id和对于的function注册到commands里面,这里就是applyCommand

问题这个commands会在什么时候被触发执行,

ServerSessionContext setRequestSequence(long request) {    if (request > this.requestSequence) {      this.requestSequence = request;      // When the request sequence number is incremented, get the next queued request callback and call it.      // This will allow the command request to be evaluated in sequence.      Runnable command = this.commands.remove(nextRequestSequence());      if (command != null) {        command.run();      }    }    return this;  }

在setRequestSequence的时候,

当set的时候,去commands里面看下,是否有下一个request在等待,如果有直接执行掉

 

applyCommand

/**   * Applies the given command to the log.   */  private void applyCommand(CommandRequest request, ServerSessionContext session, CompletableFuture<CommandResponse> future) {    final Command command = request.command();    final long term = context.getTerm();    final long timestamp = System.currentTimeMillis();    final long index;    // Create a CommandEntry and append it to the log.    try (CommandEntry entry = context.getLog().create(CommandEntry.class)) {      entry.setTerm(term)        .setSession(request.session())        .setTimestamp(timestamp)        .setSequence(request.sequence())        .setCommand(command);      index = context.getLog().append(entry); //把CommandEntry写入log      LOGGER.debug("{} - Appended {}", context.getCluster().member().address(), entry);    }    // Replicate the command to followers.    appendCommand(index, future);    // Set the last processed request for the session. This will cause sequential command callbacks to be executed.    session.setRequestSequence(request.sequence()); //更新session的sequence,这里会试图去check session.commands是否有next request  }

appendCommand

/**   * Sends append requests for a command to followers.   */  private void appendCommand(long index, CompletableFuture<CommandResponse> future) {    appender.appendEntries(index).whenComplete((commitIndex, commitError) -> { //appendEntries到该index      context.checkThread();      if (isOpen()) {        if (commitError == null) {          applyCommand(index, future); //如果成功,applyCommand        } else {          future.complete(logResponse(CommandResponse.builder()            .withStatus(Response.Status.ERROR)            .withError(CopycatError.Type.INTERNAL_ERROR)            .build()));        }      }    });  }

applyCommand,函数名不能换换吗

  /**   * Applies a command to the state machine.   */  private void applyCommand(long index, CompletableFuture<CommandResponse> future) {    context.getStateMachine().<ServerStateMachine.Result>apply(index).whenComplete((result, error) -> {      if (isOpen()) {        completeOperation(result, CommandResponse.builder(), error, future);      }    });  }

apply,我收到command首先要把它写到log里面,然后同步给follower,最终,需要去执行command,比如修改状态机里面的值,a=1

 

ServerContext.getStateMachine(),返回

private ServerStateMachine stateMachine;
 
这里调用ServerStateMachine.apply(index)
调用apply(entry)
调用apply((CommandEntry) entry)
private CompletableFuture<Result> apply(CommandEntry entry) {    final CompletableFuture<Result> future = new CompletableFuture<>();    final ThreadContext context = ThreadContext.currentContextOrThrow(); //这里保留当前thread的引用    // First check to ensure that the session exists.    ServerSessionContext session = executor.context().sessions().getSession(entry.getSession());    // If the session is null, return an UnknownSessionException. Commands applied to the state machine must    // have a session. We ensure that session register/unregister entries are not compacted from the log    // until all associated commands have been cleaned.    if (session == null) { //session不存在      log.release(entry.getIndex());      return Futures.exceptionalFuture(new UnknownSessionException("unknown session: " + entry.getSession()));    }    // If the session is not in an active state, return an UnknownSessionException. Sessions are retained in the    // session registry until all prior commands have been released by the state machine, but new commands can    // only be applied for sessions in an active state.    else if (!session.state().active()) { //session的状态非active      log.release(entry.getIndex());      return Futures.exceptionalFuture(new UnknownSessionException("inactive session: " + entry.getSession()));    }    // If the command‘s sequence number is less than the next session sequence number then that indicates that    // we‘ve received a command that was previously applied to the state machine. Ensure linearizability by    // returning the cached response instead of applying it to the user defined state machine.    else if (entry.getSequence() > 0 && entry.getSequence() < session.nextCommandSequence()) { //已经apply过的entry      // Ensure the response check is executed in the state machine thread in order to ensure the      // command was applied, otherwise there will be a race condition and concurrent modification issues.      long sequence = entry.getSequence();      // Switch to the state machine thread and get the existing response.      executor.executor().execute(() -> sequenceCommand(sequence, session, future, context)); //直接返回之前apply的结果      return future;    }    // If we‘ve made it this far, the command must have been applied in the proper order as sequenced by the    // session. This should be the case for most commands applied to the state machine.    else {      // Allow the executor to execute any scheduled events.      long index = entry.getIndex();      long sequence = entry.getSequence();      // Calculate the updated timestamp for the command.      long timestamp = executor.timestamp(entry.getTimestamp());      // Execute the command in the state machine thread. Once complete, the CompletableFuture callback will be completed      // in the state machine thread. Register the result in that thread and then complete the future in the caller‘s thread.      ServerCommit commit = commits.acquire(entry, session, timestamp); //这里有个ServerCommitPool的实现,为了避免反复生成ServerCommit对象,直接从pool里面拿一个,用完放回去      executor.executor().execute(() -> executeCommand(index, sequence, timestamp, commit, session, future, context));      // Update the last applied index prior to the command sequence number. This is necessary to ensure queries sequenced      // at this index receive the index of the command.      setLastApplied(index);      // Update the session timestamp and command sequence number. This is done in the caller‘s thread since all      // timestamp/index/sequence checks are done in this thread prior to executing operations on the state machine thread.      session.setTimestamp(timestamp).setCommandSequence(sequence);      return future;    }  }

 

executeCommand
ServerCommit commit = commits.acquire(entry, session, timestamp);
executor.executor().execute(() -> executeCommand(index, sequence, timestamp, commit, session, future, context));

注意这里有两个线程,

一个是context,是

ThreadContext threadContext

用来响应server请求的

还有一个是executor里面的stateContext,用来改变stateMachine的状态的

所以这里是用executor来执行executeCommand,但把ThreadContext传入

/**   * Executes a state machine command.   */  private void executeCommand(long index, long sequence, long timestamp, ServerCommit commit, ServerSessionContext session, CompletableFuture<Result> future, ThreadContext context) {    // Trigger scheduled callbacks in the state machine.    executor.tick(index, timestamp);    // Update the state machine context with the commit index and local server context. The synchronous flag    // indicates whether the server expects linearizable completion of published events. Events will be published    // based on the configured consistency level for the context.    executor.init(commit.index(), commit.time(), ServerStateMachineContext.Type.COMMAND);    // Store the event index to return in the command response.    long eventIndex = session.getEventIndex();    try {      // Execute the state machine operation and get the result.      Object output = executor.executeOperation(commit);      // Once the operation has been applied to the state machine, commit events published by the command.      // The state machine context will build a composite future for events published to all sessions.      executor.commit();      // Store the result for linearizability and complete the command.      Result result = new Result(index, eventIndex, output);      session.registerResult(sequence, result); // 缓存执行结果      context.executor().execute(() -> future.complete(result)); // complete future,表示future执行结束    } catch (Exception e) {      // If an exception occurs during execution of the command, store the exception.      Result result = new Result(index, eventIndex, e);      session.registerResult(sequence, result);      context.executor().execute(() -> future.complete(result));    }  }

 

ServerStateMachineExecutor.tick
根据时间,去触发scheduledTasks中已经到时间的task
 
ServerStateMachineExecutor.init
更新state machine的context
void init(long index, Instant instant, ServerStateMachineContext.Type type) {    context.update(index, instant, type);  }    //ServerStateMachineContext  void update(long index, Instant instant, Type type) {    this.index = index;    this.type = type;    clock.set(instant);  }
 
ServerStateMachineExecutor.executeOperation
<T extends Operation<U>, U> U executeOperation(Commit commit) {    // Get the function registered for the operation. If no function is registered, attempt to    // use a global function if available.    Function function = operations.get(commit.type()); //从operations找到type对应的function    if (function == null) {      // If no operation function was found for the class, try to find an operation function      // registered with a parent class.      for (Map.Entry<Class, Function> entry : operations.entrySet()) {        if (entry.getKey().isAssignableFrom(commit.type())) { //如果注册的type是commit.type的父类          function = entry.getValue();          break;        }      }      // If a parent operation function was found, store the function for future reference.      if (function != null) {        operations.put(commit.type(), function);      }    }    if (function == null) {      throw new IllegalStateException("unknown state machine operation: " + commit.type());    } else {      // Execute the operation. If the operation return value is a Future, await the result,      // otherwise immediately complete the execution future.      try {        return (U) function.apply(commit); //真正执行function      } catch (Exception e) {        throw new ApplicationException(e, "An application error occurred");      }    }  }
 
 
RequestSequence 和 CommandSequence有什么不同的,看看都在什么地方用了?
 

RequestSequence

Set

ServerStateMachine

private CompletableFuture<Void> apply(KeepAliveEntry entry) {
//…
  // Update the session keep alive index for log cleaning.
session.setKeepAliveIndex(entry.getIndex()).setRequestSequence(commandSequence);
}

 

LeaderState

private void applyCommand(CommandRequest request, ServerSessionContext session, CompletableFuture<CommandResponse> future) {
//……
  // Set the last processed request for the session. This will cause sequential command callbacks to be executed.
session.setRequestSequence(request.sequence());
}
 

Get

ServerSessionContext.setCommandSequence
// If the request sequence number is less than the applied sequence number, update the request    // sequence number. This is necessary to ensure that if the local server is a follower that is    // later elected leader, its sequences are consistent for commands.    if (sequence > requestSequence) {      // Only attempt to trigger command callbacks if any are registered.      if (!this.commands.isEmpty()) {        // For each request sequence number, a command callback completing the command submission may exist.        for (long i = this.requestSequence + 1; i <= sequence; i++) {          this.requestSequence = i;          Runnable command = this.commands.remove(i);          if (command != null) {            command.run();          }        }      } else {        this.requestSequence = sequence;      }    }

 

LeaderState

/**   * Sequences the given command to the log.   */  private void sequenceCommand(CommandRequest request, ServerSessionContext session, CompletableFuture<CommandResponse> future) {    // If the command is LINEARIZABLE and the session‘s current sequence number is less then one prior to the request    // sequence number, queue this request for handling later. We want to handle command requests in the order in which    // they were sent by the client. Note that it‘s possible for the session sequence number to be greater than the request    // sequence number. In that case, it‘s likely that the command was submitted more than once to the    // cluster, and the command will be deduplicated once applied to the state machine.    if (request.sequence() > session.nextRequestSequence()) {      // If the request sequence number is more than 1k requests above the last sequenced request, reject the request.      // The client should resubmit a request that fails with a COMMAND_ERROR.      if (request.sequence() - session.getRequestSequence() > MAX_REQUEST_QUEUE_SIZE) {

 

CommandSequence

Set

ServerSessionContext.setCommandSequence

for (long i = commandSequence + 1; i <= sequence; i++) {      commandSequence = i;      List<Runnable> queries = this.sequenceQueries.remove(commandSequence);      if (queries != null) {        for (Runnable query : queries) {          query.run();        }        queries.clear();        queriesPool.add(queries);      }    }

 

ServerStateMachine

private CompletableFuture<Result> apply(CommandEntry entry)
// Update the session timestamp and command sequence number. This is done in the caller‘s thread since all
// timestamp/index/sequence checks are done in this thread prior to executing operations on the state machine thread.
session.setTimestamp(timestamp).setCommandSequence(sequence);

 

Get

LeaderState

sequenceLinearizableQuery, sequenceBoundedLinearizableQuery
/**   * Sequences a bounded linearizable query.   */  private void sequenceBoundedLinearizableQuery(QueryEntry entry, ServerSessionContext session, CompletableFuture<QueryResponse> future) {    // If the query‘s sequence number is greater than the session‘s current sequence number, queue the request for    // handling once the state machine is caught up.    if (entry.getSequence() > session.getCommandSequence()) {      session.registerSequenceQuery(entry.getSequence(), () -> applyQuery(entry, future));    } else {      applyQuery(entry, future);    }  }

 

PassiveState

private void sequenceQuery(QueryEntry entry, ServerSessionContext session, CompletableFuture<QueryResponse> future) {    // If the query‘s sequence number is greater than the session‘s current sequence number, queue the request for    // handling once the state machine is caught up.    if (entry.getSequence() > session.getCommandSequence()) {      session.registerSequenceQuery(entry.getSequence(), () -> indexQuery(entry, future));    } else {      indexQuery(entry, future);    }  }

Copycat - command