/** * Representation of a server-sent event. According to the specification, an empty data field designates an event * which is to be ignored which is useful for heartbeats. * * @param data data, may span multiple lines * @param eventType optional type, must not contain \n or \r * @param id optional id, must not contain \n or \r * @param retry optional reconnection delay in milliseconds */final case class ServerSentEvent( data: String, eventType: Option[String] = None, id: Option[String] = None, retry: Option[Int] = None) {...}
import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._ complete { Source .tick(2.seconds, 2.seconds, NotUsed) .map( _ => processToServerSentEvent) .keepAlive(1.second, () => ServerSentEvent.heartbeat) }
private def processToServerSentEvent: ServerSentEvent = { Thread.sleep(3000) //processing delay ServerSentEvent(SyncFiles.fileToSync) }
object SyncFiles { var fileToSync: String = "" } private def route = { import Directives._ import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._ def syncRequests = pathPrefix("sync") { pathSingleSlash { post { parameter("file") { filename => complete { SyncFiles.fileToSync = filename s"set download file to : $filename" } } } } }
import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._ import system.dispatcher Http() .singleRequest(Get("http://localhost:8011/events")) .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]]) .foreach(_.runForeach(se => downloadFiles(se.data)))
def downloadFiles(file: String) = { Thread.sleep(3000) //process delay if (file != "") println(s"Try to download $file") }
scala.io.StdIn.readLine() println("do some thing ...") Http().singleRequest( HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders") ).onSuccess { case msg => println(msg) } scala.io.StdIn.readLine() println("do some other things ...") Http().singleRequest( HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items") ).onSuccess { case msg => println(msg) }
do some thing ...HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:50:52 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Orders),HttpProtocol(HTTP/1.1))Try to download OrdersTry to download Ordersdo some other things ...HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:51:02 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Items),HttpProtocol(HTTP/1.1))Try to download OrdersTry to download OrdersTry to download ItemsTry to download ItemsTry to download ItemsProcess finished with exit code 0
import akka.NotUsedimport akka.actor.ActorSystemimport akka.http.scaladsl.Httpimport akka.http.scaladsl.server.Directivesimport akka.stream.ActorMaterializerimport akka.stream.scaladsl.Sourceimport scala.concurrent.duration.DurationIntimport akka.http.scaladsl.model.sse.ServerSentEventobject SSEServer { def main(args: Array[String]): Unit = { implicit val system = ActorSystem() implicit val mat = ActorMaterializer() Http().bindAndHandle(route, "localhost", 8011) scala.io.StdIn.readLine() system.terminate() } object SyncFiles { var fileToSync: String = "" } private def route = { import Directives._ import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._ def syncRequests = pathPrefix("sync") { pathSingleSlash { post { parameter("file") { filename => complete { SyncFiles.fileToSync = filename s"set download file to : $filename" } } } } } def events = path("events") { get { complete { Source .tick(2.seconds, 2.seconds, NotUsed) .map( _ => processToServerSentEvent) .keepAlive(1.second, () => ServerSentEvent.heartbeat) } } } syncRequests ~ events } private def processToServerSentEvent: ServerSentEvent = { Thread.sleep(3000) //processing delay ServerSentEvent(SyncFiles.fileToSync) }}
import akka.NotUsedimport akka.actor.ActorSystemimport akka.http.scaladsl.Httpimport akka.http.scaladsl.client.RequestBuilding.Getimport akka.http.scaladsl.model.HttpMethodsimport akka.http.scaladsl.unmarshalling.Unmarshalimport akka.stream.ActorMaterializerimport akka.stream.scaladsl.Sourceimport akka.http.scaladsl.model.sse.ServerSentEventimport akka.http.scaladsl.model._object SSEClient { def downloadFiles(file: String) = { Thread.sleep(3000) //process delay if (file != "") println(s"Try to download $file") } def main(args: Array[String]): Unit = { implicit val system = ActorSystem() implicit val mat = ActorMaterializer() import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._ import system.dispatcher Http() .singleRequest(Get("http://localhost:8011/events")) .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]]) .foreach(_.runForeach(se => downloadFiles(se.data))) scala.io.StdIn.readLine() println("do some thing ...") Http().singleRequest( HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders") ).onSuccess { case msg => println(msg) } scala.io.StdIn.readLine() println("do some other things ...") Http().singleRequest( HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items") ).onSuccess { case msg => println(msg) } scala.io.StdIn.readLine() system.terminate() }}