Skip to content

Instantly share code, notes, and snippets.

@ikhoon
Last active May 3, 2018 07:05
Show Gist options
  • Save ikhoon/e7d93fc492dcc19dc6797a90ae4adb8c to your computer and use it in GitHub Desktop.
Save ikhoon/e7d93fc492dcc19dc6797a90ae4adb8c to your computer and use it in GitHub Desktop.
Reactive Stream을 활용 예제
import org.reactivestreams.{Processor, Publisher, Subscriber, Subscription}
// 데이터 생성
val publisher = new Publisher[Int] {
def subscribe(s: Subscriber[_ >: Int]) = {
(1 to 100).foreach { x =>
s.onNext(x)
}
s.onComplete()
}
}
// 데이터 변경
val processor = new Processor[Int, String] {
val f: Int => String = x => s"string value of ${x.toString}"
var subscriber: Subscriber[_ >: String] = _ // down stream
override def subscribe(s: Subscriber[_ >: String]) = subscriber = s
override def onSubscribe(s: Subscription) = {
println("processor subscribe")
s.request(1)
}
override def onNext(t: Int) = { // get data from upstream
println("processor on next")
subscriber.onNext(f(t)) // pass data to downstream
}
override def onError(t: Throwable) = {
println("processor on error")
}
override def onComplete() = {
println("processor on complete")
}
}
// 데이터 소비
val subscriber = new Subscriber[String] {
override def onSubscribe(s: Subscription) = {
println("start subscription")
}
override def onNext(t: String) = {
println(s"on next : $t")
}
override def onError(t: Throwable) = {
println(s"on error: $t")
}
override def onComplete() = {
println(s"on complete")
}
}
processor.subscribe(subscriber)
publisher.subscribe(processor)
/* result
processor on next
on next : string value of 1
processor on next
on next : string value of 2
processor on next
on next : string value of 3
processor on next
on next : string value of 4
processor on next
on next : string value of 5
processor on next
on next : string value of 6
processor on next
on next : string value of 7
processor on next
on next : string value of 8
processor on next
on next : string value of 9
processor on next
on next : string value of 10
processor on next
on next : string value of 11
processor on next
on next : string value of 12
processor on next
on next : string value of 13
processor on next
on next : string value of 14
processor on next
on next : string value of 15
processor on next
on next : string value of 16
processor on next
on next : string value of 17
processor on next
*/
@karellen-kim
Copy link

👍 👍 👍 👍 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment