首页 > 代码库 > AKKA-HTTP DSL源码解读

AKKA-HTTP DSL源码解读

  • 示例代码
  • Directive的创建
    • 调用堆栈
    • 一个分支
    • 回到主干
  • 深入解读
  • 关于函数式编程的思考

AKKA-HTTP的DSL设计简洁而优雅,让人禁不住一窥其内部的实现,内部浓浓的函数式编程风格也是学习函数式编程很好参考,因此这部分代码非常值得仔细咀嚼一番。我们先看一个DSL的应用代码示例:

本文原文出处: http://blog.csdn.net/bluishglc/article/details/53215338 转载请注明出处。

示例代码

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.{Directive0, Route}
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import scala.io.StdIn

object WebServer {
    def main(args: Array[String]) {
        implicit val system = ActorSystem("my-system")
        implicit val materializer = ActorMaterializer()
        // needed for the future flatMap/onComplete in the end
        implicit val executionContext = system.dispatcher
        val route =
            path("hello") {
                get {
                    complete("Say hello to akka-http")
                }
            }
        val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
        println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
        StdIn.readLine() // let it run until user presses return
        bindingFuture
            .flatMap(_.unbind()) // trigger unbinding from the port
            .onComplete(_ => system.terminate()) // and shutdown when done
    }
}

我们主要关注的是变量route的定义,它用AKKA-HTTP提供的DSL定义了一个非常简单的路由,作为一个使用者,你不得不感叹这套DSL定义地非常优雅。我们的代码走读就要从这个route的定义开始了。为了更容易理解,我们把route的定义改写成更加直白的形式,两种形式是完全等价的:

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.{Directive0, Route, StandardRoute}
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer

import scala.io.StdIn

object WebServer {
    def main(args: Array[String]) {
        implicit val system = ActorSystem("my-system")
        implicit val materializer = ActorMaterializer()
        // needed for the future flatMap/onComplete in the end
        implicit val executionContext = system.dispatcher

        val complete1: StandardRoute = complete("Say hello to akka-http");
        val route1: Route = get(complete1)
        val path1: Directive0 = path("hello")
        val route: Route = path1.apply(route1)

        val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
        println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
        StdIn.readLine() // let it run until user presses return
        bindingFuture
            .flatMap(_.unbind()) // trigger unbinding from the port
            .onComplete(_ => system.terminate()) // and shutdown when done
    }
}

Directive的创建

调用堆栈

让我们从get方法说起,get不是一个接受StandardRoute类型参数的函数,而是一个无参数的函数,它返回Directive0,而后面的这个StandardRoute类型参列表实际上是Directive0apply方法的参数!get这个Directive的创建过程其实是非常曲折的!它的调用关系是:akka.http.scaladsl.server.directives.MethodDirectives#get -> akka.http.scaladsl.server.directives.MethodDirectives#_get -> akka.http.scaladsl.server.directives.MethodDirectives#method -> akka.http.scaladsl.server.directives.MethodDirectives#extractMethod -> akka.http.scaladsl.server.directives.MethodDirectives#_extractMethod -> akka.http.scaladsl.server.directives.BasicDirectives#extract -> akka.http.scaladsl.server.directives.BasicDirectives#textract -> akka.http.scaladsl.server.Directive#apply, 为了便于阅读,我们先把_extractMethodextracttextractapply这些方法放到一起:

// akka.http.scaladsl.server.directives.MethodDirectives#get
def get: Directive0 = _get

// akka.http.scaladsl.server.directives.MethodDirectives#_get
private val _get    : Directive0 = method(GET)

// akka.http.scaladsl.server.directives.MethodDirectives#method
def method(httpMethod: HttpMethod): Directive0 =
    extractMethod.flatMap[Unit] {
        case `httpMethod` ? pass
        case _            ? reject(MethodRejection(httpMethod))
    } & cancelRejections(classOf[MethodRejection])

// akka.http.scaladsl.server.directives.MethodDirectives#extractMethod
def extractMethod: Directive1[HttpMethod] = _extractMethod

// akka.http.scaladsl.server.directives.MethodDirectives#_extractMethod
private val _extractMethod: Directive1[HttpMethod] =
    BasicDirectives.extract(_.request.method) //ctx=>ctx.request.method

// akka.http.scaladsl.server.directives.BasicDirectives#extract    
def extract[T](f: RequestContext ? T): Directive1[T] =
    textract(ctx ? Tuple1(f(ctx)))    

// akka.http.scaladsl.server.directives.BasicDirectives#textract    
def textract[L: Tuple](f: RequestContext ? L): Directive[L] =
    Directive { inner ? ctx ? inner(f(ctx))(ctx) } 

// akka.http.scaladsl.server.Directive#apply
def apply[T: Tuple](f: (T ? Route) ? Route): Directive[T] =
    new Directive[T] { def tapply(inner: T ? Route) = f(inner) }    

// akka.http.scaladsl.server.Directive#addByNameNullaryApply
implicit def addByNameNullaryApply(directive: Directive0): (? Route) ? Route =
r ? directive.tapply(_ ? r) //t=>r
//t没有用到,使用_忽略参数,所以我们可以看到,这个隐式转换要做的工作是:把一个传递进来的路由变成了它的内部路由!这个传递进来的路由就是complete1!!

一个分支

当代码代码在执行到akka.http.scaladsl.server.directives.MethodDirectives#method时还有一个分支,即里面的flatMap方法,它是通过一个隐式转换,为生成的Directive添加逻辑,相应的调用堆栈如下:akka.http.scaladsl.server.directives.MethodDirectives#method -> akka.http.scaladsl.server.Directive.SingleValueModifiers#flatMap -> akka.http.scaladsl.server.Directive#tflatMap

// akka.http.scaladsl.server.directives.MethodDirectives#method
def method(httpMethod: HttpMethod): Directive0 =
    extractMethod.flatMap[Unit] {
        case `httpMethod` ? pass
        case _            ? reject(MethodRejection(httpMethod))
    } & cancelRejections(classOf[MethodRejection])

// akka.http.scaladsl.server.Directive.SingleValueModifiers#flatMap
def flatMap[R: Tuple](f: T ? Directive[R]): Directive[R] =
  underlying.tflatMap { case Tuple1(value) ? f(value) }

// akka.http.scaladsl.server.Directive#tflatMap
  def tflatMap[R: Tuple](f: L ? Directive[R]): Directive[R] =
    Directive[R] { inner ? tapply { values ? f(values) tapply inner } }

上面代码的逻辑相对比较清晰,隐式转换为Direcitve添加了一个方法flatMap,这个方法构造了一个偏函数,这个偏函数接受一个Tuple,提取出其中的值传递给传入的函数参数f进行调用。函数参数f的工作判断根据传入的值(HTTP元素)是否与本Directive创建时声明的HTTP元素一致,如果一致,让请求通过(不做任何事情),否则,拒绝请求(尝试新的路由)。

回到主干

主调用堆栈里一连串的方法调用的解释是:我们现在要针对HTTP请求的方法(Get/Put/Post等等)创建一个Directive, _extractMethod告诉了程序如何从请求上下文里获取当前“HTTP请求方法”的方法。extract做的工作是:因为通行的创建Directive的方法要求提供的是一个Tuple,所以extract只是简单地把获取的“HTTP请求方法”封装成一个Tupletextract开始真正地去创建这个Directive,一个Directive最主要的成分是它会包裹一个内部的路由(inner route),所谓的“路由”是指这样一个函数:它接受一个请求上下文然后产出一个结果(或完成或拒绝或是失败等等),这个函数,也可以叫”处理过程”被称之为“路由”。那什么是”内部路由”呢?”内部路由”实际上就是从当前这个Directive携带的一个些“值”(这些“值”就是一些HTTP元素,比如:HttpMethod,HttpHeader等,这些值会封装在一个Tuple里),“内部路由”就是以这个Directive的内部HTTP元素为“依据”,从接受请求到得到一个路由结果的操作!对应到代码上,就是在构建Directive实例时传递的函数参数inner, 即类型是T ? Route。一个非常值得注意的地方是:直到我们创建出这个Directive实例后,内部路由的具体实现(即inner的“值”)可是不直到的,它还是以一个函数参数的形式存在的。

让我们从_extractMethod开始,逐一解读:


代码:

// akka.http.scaladsl.server.directives.MethodDirectives#_extractMethod
private val _extractMethod: Directive1[HttpMethod] =
    BasicDirectives.extract(_.request.method)
//上述代码等同于:
private val _extractMethod: Directive1[HttpMethod] =
    BasicDirectives.extract(ctx=>ctx.request.method)

解读:

我将试图创建一个面向HttpMethodDirective1,现在有一个现成的方法,它只要我提供一个函数就可以直接创建出这个Directive了,这个函数要能从一个RequestContext里面提取出一个HttpMethod。针对这个要求传入的函数,我给了它我的实现,即:从当前的RequestContext中取出当前的request中的method即可。至此,我的工作完成,了解更多细节请参考我委托的函数。


代码:

// akka.http.scaladsl.server.directives.BasicDirectives#extract    
def extract[T](f: RequestContext ? T): Directive1[T] =
    textract(ctx ? Tuple1(f(ctx)))   

解读:

我将试图创建一个面向任意HTTP元素的Directive1,首先,我需要你提供给我一个函数,这个函数要能从一个RequestContext里面提取出对应的HTTP元素,我将利用这个函数得到这个HTTP元素,然后,现在有一个现成的方法,它只要我提供一个函数就可以直接创建出这个Directive了,这个函数要能从一个RequestContext里面提取出一个Tuple1。针对这个要求传入的函数,我给了它我的实现,即:利用传入的函数得到HTTP元素,把这个HTTP元素包裹成一个Tuple1并返回就可以了。


代码:

// akka.http.scaladsl.server.directives.BasicDirectives#textract    
def textract[L: Tuple](f: RequestContext ? L): Directive[L] =
    Directive { inner ? ctx ? inner(f(ctx))(ctx) } 

解读:

extract方法只是把HTTP元素封装到了一个Tuple里,我才是真正负责创建Directive的通用的标准方法。我将试图创建一个包含一个TupleDirective(Tuple里的元素都是HTTP元素)。首先,我需要你提供给我一个函数,这个函数要能从一个RequestContext里面提取出封装了对应HTTP元素的Tuple,我将利用这个函数得到这个Tuple,显然,这个工作extract方法已经如实地完成了,然后,我将利用Directiveapply方法来创建这个Directive实例。为此我们需要先看一下apply方法的实现:

// akka.http.scaladsl.server.Directive#apply
def apply[T: Tuple](f: (T ? Route) ? Route): Directive[T] =
    new Directive[T] { def tapply(inner: T ? Route) = f(inner) }    

apply方法要求传入一个函数,这是一个高阶函数,它需要先利用另一个函数,这个函数接受一个Tuple,返回一个Route,它自己也返回一个RouteRoute是这样一种类型type Route = RequestContext ? Future[RouteResult]。为了更加容易地理解代码,我们先把类型展开,这样看起来会更加清晰:

f: (T ? Route) ? Route
f: ((T ? (RequestContext ? Future[RouteResult])) =>(RequestContext ? Future[RouteResult])

针对这个需要传入的高阶函数f,我给出了我的实现:

inner ? ctx ? inner(f(ctx))(ctx) 

对照函数f的声明,我们来一一核实实现代码中的每一个元素和类型,此处代码简短,却处处是函数式编程的浓郁味道,让我们慢慢地分析:

  • inner是apply方法函数参数f参数,它本身是一个函数,它的类型应该是T ? Route也就是T ? (RequestContext ? Future[RouteResult])
  • ctx ? inner(f(ctx))(ctx)是apply方法函数参数f返回值,它本身是一个函数,它的类型应该是Route也就是RequestContext ? Future[RouteResult]

让我们来推导一下ctx ? inner(f(ctx))(ctx)的类型,ctxRequestContext类型,没有任何问题,inner(f(ctx))(ctx)的类型呢?已知inner的类型是T ? (RequestContext ? Future[RouteResult]))inner(f(ctx))(ctx)是柯里化处理,f(ctx)的类型是T,所以inner(f(ctx))的类型是RequestContext ? Future[RouteResult],传递ctx参数之后,返回值的类型就是Future[RouteResult],即:inner(f(ctx))(ctx)的类型是Future[RouteResult],所以,ctx ? inner(f(ctx))(ctx)的类型就是:RequestContext ? Future[RouteResult]!!

深入解读

apply方法看上去很难理解,这也许是函数式编程的原因,但是读懂p它是非常重要的,实际上它声明需要的这个函数f: (T ? Route) ? Route正是对Directive和Route之间微妙关系的一种揭示。函数参数T ? Route被称之为内部路由,它通常是在Directive实例创建之后,在嵌套子Directive的时候传入的。在嵌套子Directive的时候会进行一个隐式转化,从而把Directive转换为一个路由,这一点后面我们会提到。

f: (T ? Route) ? Route这个函数类型设计是非常“tricky”的!它的参数也是一个函数,类型是T ? Route,注意:T的数据是来自当前的Directive的!第一个Route是外面传进来的子Directive的路由,第二个Route是在基于当前Directive的某些值封装一些逻辑附加到传进来的路由之上返回的新的路由。这个路由还将继续传递给后续的Directive进行接力,不断附加逻辑上去。

所以每一个内部路由,也就是inner,要做的就是根据当前Directive提供的“信息”封装路由相关的逻辑然后附加到已有的路由上,附加之后返回的新的路由就f的执行结果,一个新的Route,这种接受旧路由返回新路由的设计确保路由可以层层附加逻辑并传递下去!

Route是面向一个HTTP请求在处理流程上不断累加起来的一系列的“指令”的组合,Directive是解读一个HTTP请求时提取出来的一个一个的“指令”。Directive有很多种,每一种都含有一些特定的信息,比如path,method等等,每个Directive在构建的时候,都会把自己的这些信息“写”到路由上(内部路由),它写的并不是这些信息本身,而是从上下文里得到这些信息的“方法”!它从不往路由上“堆砌”值,它往路由上堆砌的是“逻辑”!这是函数式编程的风格和做法。在我们展示的这个例子里,completeget的内部路由,getcomplete的路由基础上添加了自己的“指令”:提取当前HTTP请求的请求方法(Get?Put?等等)。当路由“行进”到该指令时,就会取得当前HTTP请求的请求方法,然后和指令声明的get方法进行比对,如果请求的刚好是get方法,那就从这个指令继续向下路由,如果不是,就要询问一个其他的兄弟指令了。

我们来看最后一个问题:在val route1: Route = get(complete1)这行代码里,get返回的是一个Directive0,而它的apply方法声明要接受的参数是(? Route) ? Route,而我们传入的参数complete1是一个Route,这显然无法工作,所以这里必定发生了隐式转化,这个转换是:akka.http.scaladsl.server.Directive.addByNameNullaryApply

  implicit def addByNameNullaryApply(directive: Directive0): (? Route) ? Route =
    r ? directive.tapply(_ ? r)

首先要明确的是这个隐式转换返回的是一个函数,类型是(? Route) ? Route,需要注意的是(? Route)是 “call-by-name”的语法,和(()? Route) ? Route完全两码事。也就是说这个隐式转化是要把一个Directive0的实例转换成一个函数,这个函数接受一个Route型的参数,返回一个Route型的结果,但是参数是”call-by-name”的。

所以函数字面量r ? directive.tapply(_ ? r)完全等同于这样一个函数:

def f(r: =>Route): Route = directive.tapply(_ ? r)

关于函数式编程的思考

由于函数被类型化,函数的调用,特别是高阶函数的调用将会发生一些与传统函数调用有本质区别的变化,其中尤为重要的一点是:高阶函数的调用可能并不是在函数,而只是在“具体化”函数的定义,高阶函数执行结束的结果可能只是“细化了一个函数的具体行为”,然后把这个细化好的函数作为一个值传给了需要它的地方,留待后面调用!

非函数式编程时,函数的逻辑是确定的,函数式编程的一个困难的地方是,在高阶函数里,你无法准确地了解函数每一步的逻辑,那些作为参数传进来的函数,可能有很多不同的实现封装着不同的逻辑,当函数参数很多或嵌套层次加深时,会给阅读代码带来很大的困难,这也是人们抱怨函数式编程可读性差的一个原因!

<script type="text/javascript"> $(function () { $(‘pre.prettyprint code‘).each(function () { var lines = $(this).text().split(‘\n‘).length; var $numbering = $(‘
    ‘).addClass(‘pre-numbering‘).hide(); $(this).addClass(‘has-numbering‘).parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($(‘
  • ‘).text(i)); }; $numbering.fadeIn(1700); }); }); </script>

    AKKA-HTTP DSL源码解读