众所周知,Akka系统是基于Actor模式的分布式运算系统,非常适合构建大数据平台。所以,无可避免地会出现独立系统之间、与异类系统、与移动系统集成的需求。由于涉及到异类和移动系统,系统对接的方式必须在一套公开的标准之上进行,包括数据格式及数据传输标准。实际上针对标准的传输连接及标准数据编码、传输、解码全过程的软件编程是非常复杂及困难的。Akka-http正是这么一套能高效解决以上问题的编程工具。Akka-http是一套支持Tcp传输标准及Http标准数据的编程工具。

  Http模式的交流方式是固定单向的:一方永远为对话启动方,另一方永远是回应方。具体运作方式是:发起方构建一个Http消息结构即Request,通过Tcp把它传给接收方;接收方对消息进行解译并按照发起方编写在消息里的要求进行一些运算及构建一个回复消息即Response并把它传回给发送方。在实际应用中这两方形成了一种服务方server与客户方client的关系:客户方向服务方发送服务请求Request;服务方根据Request提供相应运算并用Response回应结果。

  Http消息的构成有两部分:一部分是对消息本身的描述,包括Http协议版本、字符集、加密方式、压缩方式、数据类型、安全机制等,另一部分就是消息的内容,即数据本身了,消息描述部分也有一些描述是针对数据的。

  从实际应用角度来看:在Tcp上通过Http消息交换实现了一种服务及服务使用计算模式。服务提供方server处于被动调用状态,客户方client通过Request向服务方提出服务要求,服务方按照要求在服务端进行相关运算后将结果用Response返回客户方。可以看出:服务端客户端双方都涉及到了Http消息的构建、解析、传输,而服务提供方则增加了针对Request服务要求分析逻辑及对应的运算服务。

  从更高应用层次来分析:系统集成实质上是两个系统之间通过Http协议实现数据交换。整个集成过程可以概括为:Client方将数据封装成Request;然后通过Tcp上传给Server;Server收到Request后进行解析;将Request里的数据解码成内部结构数据;按Request要求进行Server端运算;将运算结果数据封装成Response;然后将Response返回Client;Client对Response进行解析;将Response里的数据解码形成内部结构数据。

  Akka-http分别提供了服务端的Server-Side-Api和客户端的Client-Side-Api来帮助编程人员简化编程。两个Api都包括了对Http消息的构建、解析、传输帮助函数。Server-Side-Api还包括了一套DSL以方便Http-Server功能编程。

  对某个人群来说,Http是一个极其繁琐的协议:这里包括了消息格式、数据加码、解码、压缩、通讯协议、传输安全等等,等等。如果单纯按照Http协议编程的话将无法避免一堆新的定义及死板规定,无可避免影响编程效率。Akka-http应该正是为了这个人群而设计的。

  Akka-http对Http消息的各组成部分进行了建模:用class来代表数据结构。然后在各类的伴生对象中提供大量的帮助函数(helper)来辅助该类型的构建、匹配等操作。如此可以大大简化Http消息的操作编程。举个例子:Request和Response两种类型的消息定义如下:

/**
 * The immutable model HTTP request model.
 */
final class HttpRequest(
  val method:   HttpMethod,
  val uri:      Uri,
  val headers:  immutable.Seq[HttpHeader],
  val entity:   RequestEntity,
  val protocol: HttpProtocol)
...
/**
 * The immutable HTTP response model.
 */
final class HttpResponse(
  val status:   StatusCode,
  val headers:  immutable.Seq[HttpHeader],
  val entity:   ResponseEntity,
  val protocol: HttpProtocol)

object HttpRequest {
...
  /* Manual Case Class things, to ease bin-compat */

  def apply(
    method:   HttpMethod                = HttpMethods.GET,
    uri:      Uri                       = Uri./,
    headers:  immutable.Seq[HttpHeader] = Nil,
    entity:   RequestEntity             = HttpEntity.Empty,
    protocol: HttpProtocol              = HttpProtocols.`HTTP/1.1`) = new HttpRequest(method, uri, headers, entity, protocol)

  def unapply(any: HttpRequest) = new OptHttpRequest(any)
...
}

object HttpResponse {
  /* Manual Case Class things, to easen bin-compat */

  def apply(
    status:   StatusCode                = StatusCodes.OK,
    headers:  immutable.Seq[HttpHeader] = Nil,
    entity:   ResponseEntity            = HttpEntity.Empty,
    protocol: HttpProtocol              = HttpProtocols.`HTTP/1.1`) = new HttpResponse(status, headers, entity, protocol)

  def unapply(any: HttpResponse): OptHttpResponse = new OptHttpResponse(any)
}

这使得我们可以很容易的构建Request和Response: 

import HttpMethods._

// construct a simple GET request to `homeUri`
val homeUri = Uri("/abc")
HttpRequest(GET, uri = homeUri)

// construct simple GET request to "/index" (implicit string to Uri conversion)
HttpRequest(GET, uri = "/index")

// construct simple POST request containing entity
val data = ByteString("abc")
HttpRequest(POST, uri = "/receive", entity = data)

import StatusCodes._

// simple OK response without data created using the integer status code
HttpResponse(200)

// 404 response created using the named StatusCode constant
HttpResponse(NotFound)

// 404 response with a body explaining the error
HttpResponse(404, entity = "Unfortunately, the resource couldn't be found.")

// A redirecting response containing an extra header
val locationHeader = headers.Location("http://example.com/other")
HttpResponse(Found, headers = List(locationHeader))

Http消息中的Entity,Header也都用HttpEntity,HttpHeader类型进行了对应。

Uri的操作也是比较麻烦的,所以Akka-http也提供了Uri类型:

/**
 * An immutable model of an internet URI as defined by http://tools.ietf.org/html/rfc3986.
 * All members of this class represent the *decoded* URI elements (i.e. without percent-encoding).
 */
sealed abstract case class Uri(scheme: String, authority: Authority, path: Path, rawQueryString: Option[String],
                               fragment: Option[String]) {...}

这样可以方便简化Uri的构建、解析:

Uri("ftp://ftp.is.co.za/rfc/rfc1808.txt") shouldEqual
  Uri.from(scheme = "ftp", host = "ftp.is.co.za", path = "/rfc/rfc1808.txt")

Uri("http://www.ietf.org/rfc/rfc2396.txt") shouldEqual
  Uri.from(scheme = "http", host = "www.ietf.org", path = "/rfc/rfc2396.txt")

Uri("ldap://[2001:db8::7]/c=GB?objectClass?one") shouldEqual
  Uri.from(scheme = "ldap", host = "[2001:db8::7]", path = "/c=GB", queryString = Some("objectClass?one"))

Uri("mailto:John.Doe@example.com") shouldEqual
  Uri.from(scheme = "mailto", path = "John.Doe@example.com")

Uri("news:comp.infosystems.www.servers.unix") shouldEqual
  Uri.from(scheme = "news", path = "comp.infosystems.www.servers.unix")

Uri("tel:+1-816-555-1212") shouldEqual
  Uri.from(scheme = "tel", path = "+1-816-555-1212")

Uri("telnet://192.0.2.16:80/") shouldEqual
  Uri.from(scheme = "telnet", host = "192.0.2.16", port = 80, path = "/")

Uri("urn:oasis:names:specification:docbook:dtd:xml:4.1.2") shouldEqual
  Uri.from(scheme = "urn", path = "oasis:names:specification:docbook:dtd:xml:4.1.2")

Http Server部分也是系统集成的主要部分,因为在绝大多数的情况下Http-Server就处于数据平台上,负责汇集系统数据及与其它系统共享平台数据。这种集成功能一般是通过用Http-Server在平台上构建Rest数据服务来实现的。由于Akka-http是基于Akka-stream功能之上的,它支持Http数据的流操作,也就是说它可以把一个Stream-Source放在Http消息的数据里,然后Akka-http的Client-Side-Api可以运算这些Source。如此可以大大方便数据库之间的数据交换,提高数据集成效率。不过Streaming功能只能在Akka-http-Api内实现。但用Akka-http-Server-Side-Api也可以很方便的实现标准Rest服务使其它异类系统可以顺利调用。下面我们就用Akka-http做个Hello World Rest服务示范: 

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import scala.concurrent._


object HelloHttp extends App {
  implicit val httpSys = ActorSystem("httpSys")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEc = httpSys.dispatcher

  val (host,port) = ("localhost",8088)

  val services: Flow[HttpRequest, HttpResponse, Any] = path("hello") {
    get {
      complete(HttpEntity(ContentTypes.`text/html(UTF-8)`,"<h> Hello World! </h>"))
    }
  }

  val futBinding: Future[Http.ServerBinding] = Http().bindAndHandle(services,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()

  futBinding.flatMap(_.unbind())
    .onComplete(_ => httpSys.terminate())
}

可以看到,用Akka-http可以很容易的实现一个Rest服务架构。

 

 

 

 

 

 

 

内容来源于网络如有侵权请私信删除
你还没有登录,请先登录注册
  • 还没有人评论,欢迎说说您的想法!