How to code (pt.1): RxJava + Retrofit

How to code (pt.1): RxJava + Retrofit

RxJava is an implementation of ReactiveX library for observable streams of data or event by using combination of the Observer pattern (read more). You will find more info on Retrofit (here).

I would like to show you how to call RestFull API enpoints using RxJava and Retfrofit 2.0. We are going to show you how to switch Retrofit’s Services method to RxJava Observable as well.

RxJava and Retrofit in practice

Before we get started, let’s add needed dependency to your gradle:

compile ‘io.reactivex:rxandroid:1.2.1’
compile ‘io.reactivex:rxjava:1.1.6’

RxAndroid included classes to RxJava that makes writing reactive components in Android applications easy.

After that add retrfoit dependencies:

compile ‘com.squareup.retrofit2:retrofit:2.1.0’

If you are using GSON please add converter for retrofit. Converter is needed to deserialize HTTP bodies into your custom class model. If you are not using it Retforit deserilizes HTTP bodies into OkHttp’s ResponseBody.

compile ‘com.squareup.retrofit2:converter-gson:2.0.0-beta4’

Here is a list of official converter modules provided by Square:

  • Gson: com.squareup.retrofit:converter-gson
  • Jackson: com.squareup.retrofit:converter-jackson
  • Moshi: com.squareup.retrofit:converter-moshi
  • Protobuf: com.squareup.retrofit:converter-protobuf
  • Wire: com.squareup.retrofit:converter-wire
  • Simple XML: com.squareup.retrofit:converter-simplexml

You can create custom converter yourselft by using Converter.Factory interface.
Next dependecy will help us return Observable object from Services:

compile ‘com.squareup.retrofit2:adapter-rxjava:2.0.0-beta4’

Thanks to that we have defined Service.

public interface APIService {
@POST(“list”)
Call loadCar();
}

In order to switch RxJava observable only one change is needed:

public interface APIService {
@POST(“list”)
Observable<Car> loadCar();
}

 

Now let’s modify your Retrofit Builder and add CallAdapter and ConverterFacvtory:

public Retrofit provideRetrofit(){
return new Retrofit.Builder()
.baseUrl(“http://api.geo.org/”)
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create(new Gson()))
.build();
}

Right, now let’s move to some examples, shall we?

Example 1

We can start and call first Restfull API method with RxJava. Let’s begin with creating simple example that will provide object car.
We are about to do some simple activty where we have included APIService object already and are ready to call loadCar using RxJava.

public class Activity extends AppCompactActivity {
APIService apiService;
@Override
protected void onCreate(Bundle savedInstanceState) {
apiService.loadCar()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
}
}

Now we want to call asynchronous task. In order to do that, we have to use subscribeOn(Schedulers.io) to execute the observable on a new thread.

If we want also to get result in main UI thread, we have to use observeOn(AndroidSchedulers.mainThread()) which means that result emits through on the main UI thread.
subscribe() method is kind of trigger. If you don’t call it the request never executes.
If you take a look closer to subscribe method you will notice that there are more overloaded methods. I prefer to use subscribe method and handle errors. I will explain this below.
Let’ modify our request and add to subscribe method parameter Subscriber.
It should look like that:

public class Activity extends AppCompactActivity {
APIService apiService;
@Override
protected void onCreate(Bundle savedInstanceState) {
apiService.loadCar()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Car>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Car car) {

}
});
}
}

There are couple of important things. Let’s start from onNext method. That method provides you with response for loadCar request, in our example that is object Car and immediately method onComplited executes.

onNext method calls everytime unless you have kind of error, for example JSONException, IOException etc. In this case method onError called instead of onNext() and onComplited doesn’t execute.
There are couple of cases to execute onComplited and onNext instead of onError when you have error. But that is subject for next tutorial.

Example 2

Let’s see what happens when we use subscribe method without parameters. We are going to resource Observable and find couple of overloaded subscribe methods. Let’s check subscribe() method:

public final Subscription subscribe() {
Action1<T> onNext = Actions.empty();
Action1<Throwable> onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
Action0 onCompleted = Actions.empty();
return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
}

On the surface everything looks pretty nice. Above we have three Action object for each action in our request. But what is weird is static value:

InternalObservableUtils.ERROR_NOT_IMPLEMENTED.

Take a look closer, In InternalObservableUtils class found:

public static final Action1<Throwable> ERROR_NOT_IMPLEMENTED = new ErrorNotImplementedAction();

Implementation class ErrorNotImplementedAction with method call will throw exception:`OnErrorNotImplementedException`;

public void call(Throwable t) {
throw new OnErrorNotImplementedException(t);
}

Final note

So if we subscribe to observable using subsribe() (method without parameter) it will work fine. But as I wrote above, it is possible to get error which is handled by onError method. In this case you will have crash because of throwing exception OnErrorNotImplementedException.
We should handle error in our side either by implementing onError action or providing Subscribe object.

Hope you enjoyed this coding tutorial, you can expect more in the future.

Leave a comment