Skip to content

Instantly share code, notes, and snippets.

@ikhoon
Last active May 3, 2018 07:05
Show Gist options
  • Save ikhoon/e7d93fc492dcc19dc6797a90ae4adb8c to your computer and use it in GitHub Desktop.
Save ikhoon/e7d93fc492dcc19dc6797a90ae4adb8c to your computer and use it in GitHub Desktop.
Reactive Stream을 활용 예제
import org.reactivestreams.{Processor, Publisher, Subscriber, Subscription}
// 데이터 생성
val publisher = new Publisher[Int] {
def subscribe(s: Subscriber[_ >: Int]) = {
(1 to 100).foreach { x =>
s.onNext(x)
}
s.onComplete()
}
}
// 데이터 변경
val processor = new Processor[Int, String] {
val f: Int => String = x => s"string value of ${x.toString}"
var subscriber: Subscriber[_ >: String] = _ // down stream
override def subscribe(s: Subscriber[_ >: String]) = subscriber = s
override def onSubscribe(s: Subscription) = {
println("processor subscribe")
s.request(1)
}
override def onNext(t: Int) = { // get data from upstream
println("processor on next")
subscriber.onNext(f(t)) // pass data to downstream
}
override def onError(t: Throwable) = {
println("processor on error")
}
override def onComplete() = {
println("processor on complete")
}
}
// 데이터 소비
val subscriber = new Subscriber[String] {
override def onSubscribe(s: Subscription) = {
println("start subscription")
}
override def onNext(t: String) = {
println(s"on next : $t")
}
override def onError(t: Throwable) = {
println(s"on error: $t")
}
override def onComplete() = {
println(s"on complete")
}
}
processor.subscribe(subscriber)
publisher.subscribe(processor)
/* result
processor on next
on next : string value of 1
processor on next
on next : string value of 2
processor on next
on next : string value of 3
processor on next
on next : string value of 4
processor on next
on next : string value of 5
processor on next
on next : string value of 6
processor on next
on next : string value of 7
processor on next
on next : string value of 8
processor on next
on next : string value of 9
processor on next
on next : string value of 10
processor on next
on next : string value of 11
processor on next
on next : string value of 12
processor on next
on next : string value of 13
processor on next
on next : string value of 14
processor on next
on next : string value of 15
processor on next
on next : string value of 16
processor on next
on next : string value of 17
processor on next
*/
@ikhoon
Copy link
Author

ikhoon commented May 2, 2018

pull based stream에서 동작하기

import cats.effect.IO
import fs2._
import fs2.interop.reactivestreams._
import org.reactivestreams.{Subscriber, Subscription}
import slick.basic.DatabasePublisher

import scala.concurrent.ExecutionContext.Implicits.global


def databasePublisher = new DatabasePublisher[Int] {
  val xs = List(1,2,3, 4, 5, 6, 7, 8, 9, 10)
  var cursor = 0
  def subscribe(subscriber: Subscriber[_ >: Int]) =
    subscriber.onSubscribe(new Subscription {
      def request(n: Long): Unit =
        if(cursor < xs.length) {
          xs.slice(cursor, math.min(cursor + n.toInt, xs.length))
            .foreach(subscriber.onNext(_))
          cursor += n.toInt
        }
        else
          subscriber.onComplete()

      def cancel(): Unit = cursor = xs.length
    })
}

databasePublisher.toStream[IO].take(5).compile.toList.unsafeRunSync()
// res0: List[Int] = List(1, 2, 3, 4, 5)

databasePublisher.toStream[IO].take(10).compile.toList.unsafeRunSync()
// res1: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

databasePublisher.toStream[IO].take(13).compile.toList.unsafeRunSync()
// res2: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

@ikhoon
Copy link
Author

ikhoon commented May 2, 2018

이거였구나...

val databasePublisher2 = new DatabasePublisher[Int] {
  val pub = Stream.emits(1 to 10).covary[IO].toUnicastPublisher()
  def subscribe(s: Subscriber[_ >: Int]) =
    pub.subscribe(s)
}


databasePublisher2.toStream[IO].take(5).compile.toList.unsafeRunSync()
// res0: List[Int] = List(1, 2, 3, 4, 5)

databasePublisher2.toStream[IO].take(10).compile.toList.unsafeRunSync()
// res1: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

databasePublisher2.toStream[IO].take(13).compile.toList.unsafeRunSync()
// res1: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

@karellen-kim
Copy link

👍 👍 👍 👍 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment