博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Akka(43): Http:SSE-Server Sent Event - 服务端主推消息
阅读量:4959 次
发布时间:2019-06-12

本文共 6911 字,大约阅读时间需要 23 分钟。

   因为我了解Akka-http的主要目的不是为了有关Web-Server的编程,而是想实现一套系统集成的api,所以也需要考虑由服务端主动向客户端发送指令的应用场景。比如一个零售店管理平台的服务端在完成了某些数据更新后需要通知各零售门市客户端下载最新数据。虽然Akka-http也提供对websocket协议的支持,但websocket的网络连接是双向恒久的,适合频繁的问答交互式服务端与客户端的交流,消息结构也比较零碎。而我们面临的可能是批次型的大量数据库数据交换,只需要简单的服务端单向消息就行了,所以websocket不太合适,而Akka-http的SSE应该比较适合我们的要求。SSE模式的基本原理是服务端统一集中发布消息,各客户端持久订阅服务端发布的消息并从消息的内容中筛选出属于自己应该执行的指令,然后进行相应的处理。客户端接收SSE是在一个独立的线程里不断进行的,不会影响客户端当前的运算流程。当收到有用的消息后就会调用一个业务功能函数作为后台异步运算任务。

服务端的SSE发布是以Source[ServerSentEvent,NotUsed]来实现的。ServerSentEvent类型定义如下:

/** * Representation of a server-sent event. According to the specification, an empty data field designates an event * which is to be ignored which is useful for heartbeats. * * @param data data, may span multiple lines * @param eventType optional type, must not contain \n or \r * @param id optional id, must not contain \n or \r * @param retry optional reconnection delay in milliseconds */final case class ServerSentEvent(  data:      String,  eventType: Option[String] = None,  id:        Option[String] = None,  retry:     Option[Int]    = None) {...}

这个类型的参数代表事件消息的数据结构。用户可以根据实际需要充分利用这个数据结构来传递消息。服务端是通过complete以SeverSentEvent类为元素的Source来进行SSE的,如下:

import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._         complete {            Source              .tick(2.seconds, 2.seconds, NotUsed)              .map( _ => processToServerSentEvent)              .keepAlive(1.second, () => ServerSentEvent.heartbeat)          }

以上代码代表服务端定时运算processToServerSentEvent返回ServerSentEvent类型结果后发布给所有订阅的客户端。我们用一个函数processToServerSentEvent模拟重复运算的业务功能:

private def processToServerSentEvent: ServerSentEvent = {    Thread.sleep(3000)   //processing delay    ServerSentEvent(SyncFiles.fileToSync)  }

这个函数模拟发布事件数据是某种业务运算结果,在这里代表客户端需要下载文件名称。我们用客户端request来模拟设定这个文件名称:

object SyncFiles {    var fileToSync: String = ""  }  private def route = {    import Directives._    import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._    def syncRequests =      pathPrefix("sync") {        pathSingleSlash {        post {            parameter("file") { filename =>              complete {                SyncFiles.fileToSync = filename                s"set download file to : $filename"              }            }          }        }      }

客户端订阅SSE的方式如下:

import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._    import system.dispatcher        Http()      .singleRequest(Get("http://localhost:8011/events"))      .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])      .foreach(_.runForeach(se => downloadFiles(se.data)))

每当客户端收到SSE后即运行downloadFiles(filename)函数。downloadFiles函数定义:

def downloadFiles(file: String) = {    Thread.sleep(3000)   //process delay    if (file != "")      println(s"Try to download $file")  }

下面是客户端程序的测试运算步骤:

scala.io.StdIn.readLine()    println("do some thing ...")    Http().singleRequest(      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders")    ).onSuccess {      case msg => println(msg)    }    scala.io.StdIn.readLine()    println("do some other things ...")    Http().singleRequest(      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items")    ).onSuccess {      case msg => println(msg)    }

运算结果:

do some thing ...HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:50:52 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Orders),HttpProtocol(HTTP/1.1))Try to download OrdersTry to download Ordersdo some other things ...HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:51:02 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Items),HttpProtocol(HTTP/1.1))Try to download OrdersTry to download OrdersTry to download ItemsTry to download ItemsTry to download ItemsProcess finished with exit code 0

下面是本次讨论的示范源代码:

服务端:

import akka.NotUsedimport akka.actor.ActorSystemimport akka.http.scaladsl.Httpimport akka.http.scaladsl.server.Directivesimport akka.stream.ActorMaterializerimport akka.stream.scaladsl.Sourceimport scala.concurrent.duration.DurationIntimport akka.http.scaladsl.model.sse.ServerSentEventobject SSEServer {  def main(args: Array[String]): Unit = {    implicit val system = ActorSystem()    implicit val mat    = ActorMaterializer()    Http().bindAndHandle(route, "localhost", 8011)    scala.io.StdIn.readLine()    system.terminate()  }  object SyncFiles {    var fileToSync: String = ""  }  private def route = {    import Directives._    import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._    def syncRequests =      pathPrefix("sync") {        pathSingleSlash {        post {            parameter("file") { filename =>              complete {                SyncFiles.fileToSync = filename                s"set download file to : $filename"              }            }          }        }      }    def events =      path("events") {        get {          complete {            Source              .tick(2.seconds, 2.seconds, NotUsed)              .map( _ => processToServerSentEvent)              .keepAlive(1.second, () => ServerSentEvent.heartbeat)          }        }      }    syncRequests ~ events  }  private def processToServerSentEvent: ServerSentEvent = {    Thread.sleep(3000)   //processing delay    ServerSentEvent(SyncFiles.fileToSync)  }}

客户端:

import akka.NotUsedimport akka.actor.ActorSystemimport akka.http.scaladsl.Httpimport akka.http.scaladsl.client.RequestBuilding.Getimport akka.http.scaladsl.model.HttpMethodsimport akka.http.scaladsl.unmarshalling.Unmarshalimport akka.stream.ActorMaterializerimport akka.stream.scaladsl.Sourceimport akka.http.scaladsl.model.sse.ServerSentEventimport akka.http.scaladsl.model._object SSEClient {  def downloadFiles(file: String) = {    Thread.sleep(3000)   //process delay    if (file != "")      println(s"Try to download $file")  }  def main(args: Array[String]): Unit = {    implicit val system = ActorSystem()    implicit val mat    = ActorMaterializer()    import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._    import system.dispatcher    Http()      .singleRequest(Get("http://localhost:8011/events"))      .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])      .foreach(_.runForeach(se => downloadFiles(se.data)))    scala.io.StdIn.readLine()    println("do some thing ...")    Http().singleRequest(      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders")    ).onSuccess {      case msg => println(msg)    }    scala.io.StdIn.readLine()    println("do some other things ...")    Http().singleRequest(      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items")    ).onSuccess {      case msg => println(msg)    }    scala.io.StdIn.readLine()    system.terminate()  }}

 

 我的博客即将同步至腾讯云+社区。邀大家一同入驻http://cloud.tencent.com/developer/support-plan

 

 

 

 

 

 

 

 

 

 

转载于:https://www.cnblogs.com/tiger-xc/p/8042765.html

你可能感兴趣的文章
iTextSharp带中文转换出来的PDF文档显示乱码
查看>>
组件:slot插槽
查看>>
走进C++程序世界------异常处理
查看>>
Nginx配置文件nginx.conf中文详解(转)
查看>>
POJ 1988 Cube Stacking
查看>>
POJ 1308 Is It A Tree?(并查集)
查看>>
N进制到M进制的转换问题
查看>>
Android------三种监听OnTouchListener、OnLongClickListener同时实现即其中返回值true或者false的含义...
查看>>
MATLAB实现多元线性回归预测
查看>>
Mac xcode 配置OpenGL
查看>>
利用sed把一行的文本文件改成每句一行
查看>>
使用Asyncio的Coroutine来实现一个有限状态机
查看>>
Android应用开发:核心技术解析与最佳实践pdf
查看>>
python——爬虫
查看>>
2.2 标识符
查看>>
孤荷凌寒自学python第五十八天成功使用python来连接上远端MongoDb数据库
查看>>
求一个字符串中最长回文子串的长度(承接上一个题目)
查看>>
简单权限管理系统原理浅析
查看>>
springIOC第一个课堂案例的实现
查看>>
求输入成绩的平均分
查看>>