Iced Tea Labs Share to be shared

RxJava Introduction

What is RxJava?

Quoted from RxJava home page:

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

With:

Two basic components in RxJava are Observable and Observer:

Basically, RxJava looks a lot like Observer Pattern but there’s one different point: When there’s no Observer listen (subscribe) to the Observable, the code block won’t execute.

That’s enough for theory, I’ll show you some simple sample code using RxJava:

Observable<String> observable = Observable.just("Captain America", "Iron Man", "Black Widow", "Hulk", "Thor");
Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {

    }

    @Override
    public void onNext(String item) {
        Log.i("Avengers", "Hello, " + item);
    }

    @Override
    public void onError(Throwable t) {
        Log.e("Avengers", t.getMessage());
    }

    @Override
    public void onComplete() {
        Log.i("Avengers", "Complete");
    }
};
observable.subscribe(observer);

Basically, the observable will emit Avengers hero names to the observer where will print the hello sentence.

The result should be:

Avengers: Hello, Captain America
Avengers: Hello, Iron Man
Avengers: Hello, Black Widow
Avengers: Hello, Hulk
Avengers: Hello, Thor
Avengers: Complete

Types of Observable

RxJava provides 5 types of Observable:

Depends on your need and usecases, you can pick the most suitable one.

Multithreading and Schedulers

My favourite part when using RxJava is that I can execute the code on multiple threads easily!

RxJava supports writting asynchronous and concorrent code pretty well.

Because of that, many new Rx developers think that RxJava is multi-threaded by default!

NO, it’s not!

RxJava is single-threaded by default

For example:

final String[] avengers = new String[]{"Captain America", "Iron Man"};
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
        Log.i("Avengers", "Observable Thread: " + Thread.currentThread().getName());
        for (String name : avengers) {
            emitter.onNext(name);
        }
        emitter.onComplete();
    }
});

Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
    }

    @Override
    public void onNext(String item) {
        Log.i("Avengers", "Hello, " + item);
    }

    @Override
    public void onError(Throwable t) {
        Log.e("Avengers", t.getMessage());
    }

    @Override
    public void onComplete() {
        Log.i("Avengers", "Done");
        Log.i("Avengers", "Observer Thread: " + Thread.currentThread().getName());
    }
};

observable.subscribe(observer);

The result should be:

Avengers: Observable Thread: main
Avengers: Hello, Captain America
Avengers: Hello, Iron Man
Avengers: Done
Avengers: Observer Thread: main

As you can see, If we simply just subscribe directly, the code will execute in the same thread.

To execute the code asynchronously, you must explicitly specify the thread by using subscribeOn(), observeOn() and Scheduler.

Scheduler

Scheduler in RxJava will define the thread to execute your code.

By default, RxJava provides plenty types of Scheduler:

Depends on the use case, you can easily pick the right Scheduler.

Also, to make it more convinient switching between threads, RxJava provides 2 methods: subscribeOn() and observeOn().

subscribeOn()

subscribeOn() specifies which Scheduler to excecute the code block inside Observable.create() method.

By changing from

observable.subscribe(observer);

to

observable
    .subscribeOn(Schedulers.io())
    .subscribe(observer);

You can see that now the code from Observable was executed on new thread rather than on main thread privously.

Avengers: Observable Thread: RxCachedThreadScheduler-1
Avengers: Hello, Captain America
Avengers: Hello, Iron Man
Avengers: Done
Avengers: Observer Thread: RxCachedThreadScheduler-1

More than that, the Observer also execute its code block on the same Scheduler which was defined in subscribeOn(). What if you want to execute Observer in the main thread again?

This is where observeOn() jumps into the picture.

observeOn()

observeOn() specifies which Scheduler to execute the code block in downstream operators.

By changing from

observable
    .subscribeOn(Schedulers.io())
    .subscribe(observer);

to

observable
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer);

You can see that now the code from Observable was executed on new thread and Observer code was run on main thread again.

Avengers: Observable Thread: RxCachedThreadScheduler-1
Avengers: Hello, Captain America
Avengers: Hello, Iron Man
Avengers: Done
Avengers: Observer Thread: main

I think this is enough for a introduction to RxJava post. Next post will be the RxJava operators’ spotlight!

comments powered by Disqus