首页 > 代码库 > storm 源码笔记

storm 源码笔记

(reify DistributedRPC$Iface

(^String execute
  [this ^String function ^String args]
  (log-debug "Received DRPC request for " function " " args " at " (System/currentTimeMillis))
  (let [id (str (swap! ctr (fn [v] (mod (inc v) 1000000000))))
               ^Semaphore sem (Semaphore. 0)
               req (DRPCRequest. args id)
               ^ConcurrentLinkedQueue queue (acquire-queue request-queues function)]
    (swap! id->start assoc id (current-time-secs))
    (swap! id->sem assoc id sem)
    (swap! id->function assoc id function)
    (swap! id->request assoc id req)
    (.add queue req)
    (log-debug "Waiting for DRPC result for " function " " args " at " (System/currentTimeMillis))
    (.acquire sem)
    (log-debug "Acquired DRPC result for " function " " args " at " (System/currentTimeMillis))
    (let [result (@id->result id)]
      (cleanup id)
      (log-debug "Returning DRPC result for " function " " args " at " (System/currentTimeMillis))
      (if (instance? DRPCExecutionException result)
        (throw result)
          (if (nil? result)
            (throw (DRPCExecutionException. "Request timed out"))
            result)))))

几个关键点:

1. reify:

Essentially, it is a way to
create objects that satisfy any protocol (or implement methods of any interface or
Object). This makes it analogous to anonymous inner classes in Java.(摘自《clojure programming》)

2. ^String

type hint

3. ^String execute

这里的^String是execute方法的返回类型hint

4. 总结

这里的代码其实就是实现了DistributedRPC$Iface接口。这个接口只定义了一个函数,execute,其接收两个参数,类型都是^String,返回类型也为^String。

关于DistributedRPC$Iface接口,参见https://storm.incubator.apache.org/apidocs/。以下是execute的说明:

String execute(String functionName,               String funcArgs)               throws DRPCExecutionException,                      org.apache.thrift.TException
和这里的实现正好吻合。

storm 源码笔记