A presentation at KotlinConf in November 2017 in San Francisco, CA, USA by Annyce Davis
a learning curve
a steep learning curve
you a steep learning curve
superstar a steep learning curve you
you superstar a steep learning curve
you superstar a steep learning curve me
RxJava in baby steps @brwngrldev
asynchronous data streams
GPS Updates Time
GPS Updates Time -36.34543, 3.23445
-36.34543, 3.23445
-36.24543, 3.23425
-35.34543, 3.13445 GPS Updates Time
Time server response
i want toys!!! Time server response
Time server response toys
Time i want toys!!! server response
Which of the following is an asynchronous data stream? A: click events B: database access C: server response D: all of the above
A: click events B: database access C: server response D: all of the above Which of the following is an asynchronous data stream?
scientific Research
Data Transformation Observable.just( 5 , 6 , 7 ) .map { ";-) " . repeat ( it ) }
.subscribe { println ( it ) }
Data Transformation Observable.just(5, 6, 7) .map { ";-) " . repeat ( it ) }
.subscribe { println ( it ) } ;-) ;-) ;-) ;-) ;-) ;-) ;-) ;-) ;-) ;-) ;-) ;-) ;-) ;-) ;-) ;-) ;-) ;-)
chaining Observable.just( 5 , 6 , 7 ) .map { ";-) " . repeat ( it ) }
.filter { it . length < 24 }
chaining ;-) ;-) ;-) ;-) ;-) Observable.just(5, 6, 7) .map { ";-) " . repeat ( it ) }
RxJava i s. . . ? A: the silver bullet B: simply magic C: pure voodoo D: a library
listOf ( 5 , 6 , 7 ) . map { it * 5 }
. filter { it
25 }
kotlin collections
kotlin collections 5 6 7
kotlin collections 5 6 7 map 25 30 35
kotlin collections 5 6 7 map 25 30 35 filter 30 35
listOf ( 5 , 6 , 7 ) . asSequence () . map { it * 5 }
. toList () kotlin sequences
kotlin sequences listOf ( 5 , 6 , 7 ) . asSequence () . map { it * 5 }
. toList () 5 6 7
5 6 7 map filter kotlin sequences 25 listOf ( 5 , 6 , 7 ) . asSequence () . map { it * 5 }
. toList ()
5 6 7 map filter kotlin sequences 25 30 30 listOf ( 5 , 6 , 7 ) . asSequence () . map { it * 5 }
5 6 7 map filter kotlin sequences 25 30 35 30 35 listOf ( 5 , 6 , 7 ) . asSequence () . map { it * 5 }
RxJava numbers
.
map { it * 5 }
. subscribe ()
. subscribe () 5 25 map filter
. subscribe () 5 6 map filter 25 30 30
. subscribe () 5 6 map filter 25 30 30 35 7 35 35
flexible threading
schedulers
Observable.just( 5 , 6 , 7 )
.map {
";-) " . repeat ( it ) }
.subscribe {
println ( it ) }
subscribeOn( Schedulers.io ())
The Basics observable
The Basics observable observer
The Basics operators observable observer
observable
hot observable Time
hot observable Time i want hugs!!!
hot observable Time hugs
hot observable Time hugs hugs
hot observable Time hugs hugs hugs
Cold observable Time
Cold observable Time hugs
Cold observable Time hugs hugs
Cold observable Time hugs hugs hugs
where?
Observable.create<Int> { subscriber
-> }
Observable.just( item1 , item2 , item3 )
Observable.just( item1 , item2 , item3 ) Observable .interval( 2 , TimeUnit. SECONDS )
Observable.create<Int> { subscriber ->
}
Logger.log( "create" ) Logger.log( "complete" ) } Logger.log( "done" )
Logger.log( "create" ) subscriber.onNext( 5 ) subscriber.onNext( 6 ) subscriber.onNext( 7 )
Logger.log( "complete" )
} Logger.log( "done" )
Logger.log( "create" ) subscriber.onNext(5) subscriber.onNext(6) subscriber.onNext(7) subscriber.onComplete() Logger.log( "complete" ) } Logger.log( "done" )
A: emit items B: be cold C: be hot D: all of the above Observables ca n. . .
observer
interface Observer<T
{
fun onError(e: Throwable)
fun onComplete()
fun onNext(t: T )
fun onSubscribe(d: Disposable) }
observer’s lifecycle onSubscribe
observer’s lifecycle onNext Normal flow onSubscribe
observer’s lifecycle onComplete Normal flow onSubscribe onNext
observer’s lifecycle onComplete onError Normal flow onSubscribe onNext
val observer = object : Observer<Int> {
override fun onError(e: Throwable) { Logger.log(e) }
override fun onComplete() { Logger.log( "on complete" ) }
override fun onNext(t: Int) { Logger.log( "next: $ t " ) }
override fun onSubscribe(d: Disposable) { Logger.log( "on subscribe" ) } }
interface Consumer<T
fun accept(t: T ) }
val consumer = object : Consumer<Int> {
override fun accept(t: Int) { Logger.log( "next: $ t " ) }
val consumer = Consumer <Int> { t -> Logger.log( "next: $ t " ) } obs.subscribe(consumer) consumer
obs.subscribe( Consumer <Int> { t -> Logger.log( "next: $ t " ) } ) consumer
obs.subscribe( { t -> Logger.log( "next: $ t " ) } ) consumer
obs.subscribe { t -> Logger.log( "next: $ t " ) }
consumer
obs.subscribe { Logger.log( "next: $it " ) }
A: have a lifecycle
B: always complete
C: Never Error
D: all of the above Observ ers. . .
Observ ers. . . A: have a lifecycle
D: all of the above
operator
Time Operator: map()
Time Operator: map() :-) :-(
Time Operator: map() :-) :-) :-( :-(
Time Operator: map() :-) :-) :-) :-( :-( :-(
Time Operator: map() :-) :-) :-) :-) :-( :-( :-( :-(
Operator: map() Observable.just(5, 6, 7) .map { ";-) " . repeat ( it ) }
Operator: map() ;-) ;-) ;-) ;-) ;-) ;-) ;-) ;-) ;-) ;-) ;-) ;-) ;-) ;-) ;-) ;-) ;-) ;-) Observable.just(5, 6, 7) .map { ";-) " . repeat ( it ) }
Operator: map() Observable.just(5, 6, 7) .map( object : Function<Int, String> {
override fun apply(t: Int): String {
return ";-) " . repeat (t) } })
.subscribe
{ println ( it ) }
.map(
object : Function<Int, String> {
return ";-) " . repeat (t) } }) .map { ";-) " . repeat ( it ) }
what’s the output? Observable.just( 1, 2, 3 ) .map { it * 2
A: 1, 2, 3
B: a, b, c
C: 2, 4, 6
D: 6, 2, 4 Observable.just( 1, 2, 3 ) .map { it * 2
.subscribe { println ( it ) } what’s the output?
Observable.just(1, 2, 3 ) .map { it * 2
.subscribe { println ( it ) } A: 1, 2, 3
D: 6, 2, 4 what’s the output?
.filter
{ it < 6
A: 2, 3, 1
B: 2, 4 C: 1, 2, 3
D: 6, 4 what’s the output? Observable.just( 1, 2, 3 ) .map { it * 2
what’s the output? Observable.just(1, 2, 3 ) .map { it * 2
.subscribe { println ( it ) } A: 2, 3, 1
D: 6, 4
Operator: flatmap() via reactivex.io
Operator: flatmap() via reactivex.io item
Operator: flatmap() via reactivex.io item observable
Operator: flatmap() Observable.just( , ) .flatMap( { Observable.just( ) } )
.subscribe { println ( it ) } :-( :-(
time flatmap
time flatmap :-(
time flatmap :-( :-(
Long Running asynchronous
Operator: flatmap() val users // Observable<User>
val posts: Observable<Post>
Operator: flatmap() val users // Observable<User> val posts: Observable<Post> posts = users.flatMap { getUsersPosts( it . id ) }
posts.subscribe { println ( it ) }
via reactivex.io
Flowable Maybe backpressure Disposable Single completable
should you use rxjava?
like functional programing? Process items asynchronously?
compose data? handle errors gracefully? should you use rxjava?
should you use rxjava? like functional programing? Process items asynchronously?
compose data? handle errors gracefully?
it depends
you
www.adavis.info @brwngrldev
• Reactive Programming on Android with RxJava ( http://amzn.to/2yOAkxn )
• Reactive Programming with RxJava ( http://amzn.to/2zQtqb5 )
• RxJava Playlist (https://goo.gl/9fw1Zv)
• Learning RxJava for Android Devs (https://goo.gl/VWxFLK )
• RxJava Video Course ( https://caster.io/courses/rxjava ) resources
slide design: @lauraemilyillustration font: Elliot 6, fontSquirrel.com