Reactive programming is a programming style where the consumer reacts to the data as it comes in. Asynchronous programming is also called reactive programming. In reactive a programming observables are allowed to propagate event changes to registered observers.
"The Observer pattern has done right. ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming."
Building blocks for RxJava2.x
Observables : Representing sources of dataSubscribers (or observers) : Listens to the observables
Methods : A set of methods for modifying and composing the data
An observable emits items; a subscriber consumes those items.
Observables :
As mentioned above Observables are the sources for the data. Observable objects that emit a stream of data and then terminate. It can terminate either successfully or with an error.Subscribers :
A subscriber objects that subscribe to Observables. An Observer receives a notification each time their assigned Observable emits a value, an error, or a completed signal.The need for asynchronous programming
Asynchronous programming refers to a style of structuring a program whereby a call to some unit of functionality triggers an action that is allowed to continue outside of the ongoing flow of the program.In the simple word when we get a task which may take a long time like I/O operations, HTTP calls, Database calls its preferred perform it outside the ongoing flow.
Reactive programming facilitates a simple way of asynchronous programming. It also provides a defined way of handling multiple events, errors and termination of the event stream. Reactive programming also makes simple the way of running different tasks in different threads.
Using reactive programming it is also possible to convert the stream before its received by the observers. Also, you can chain operations, e.g., if an API call depends on the call of another API Last but not least, reactive programming reduces the need for state variables, which can be the source of errors.
Adding RxJava2.x to a Java/Android project
For Gradle :
compile group: 'io.reactivex.rxjava2', name: 'rxjava', version: '2.1.1'
For Android :
compile 'io.reactivex.rxjava2:rxjava:2.0.8'
For Maven :
High level steps for RxJava2.x
- Creating an Observable.
- Giving that Observable some data to emit.
- Creating an Observer.
- Assigning the Observer to an Observable.
- Giving the Observer tasks to perform whenever it receives an emission from its assigned Observable.
Type | Description |
---|---|
Flowable |
Emits 0 or n items and terminates with a success or an error event. Supports backpressure, which allows controlling how fast a source emits items.
|
Observable |
Emits 0 or n items and terminates with a success or an error event.
|
Single |
Emits either a single item or an error event. The reactive version of a method call.
|
Maybe |
Succeeds with an item, or no item, or errors. The reactive version of an
Optional . |
Completable |
Either complete with a success or with an error event. It never emits items. The reactive version of a
Runnable . |
Android Example :
Creating observable
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
//Use onNext to emit each item in the stream//
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
//Once the Observable has emitted all items in the sequence, call onComplete//
e.onComplete();
}
});
or Using lambdas, the same statement can be expressed as:@Override
public void subscribe(ObservableEmitter
});
Observable observable = Observable.create(e -> {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
//Once the Observable has emitted all items in the sequence, call onComplete//
e.onComplete();
});
e.onNext(1);
e.onNext(2);
e.onNext(4);
//Once the Observable has emitted all items in the sequence, call onComplete//
e.onComplete();
});
Creating an Observer
Observer observer = new Observer() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer value) {
Log.e(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: All Done!");
}
};
//Create our subscription//
observable.subscribe(observer);
@Override
public void onSubscribe(Disposable d) {
@Override
public void onNext(Integer value) {
@Override
public void onError(Throwable e) {
@Override
public void onComplete() {
};
//Create our subscription//
observable.subscribe(observer);
Some other Convenience methods to create observables :
Observable.just("Hello") - Allows to create an observable as wrapper around other data types
Observable.fromIterable() - takes an java.lang.Iterable
Observable.fromArray() - takes an array and emits their values in their order in the data structure
Observable.fromCallable() - Allows to create an observable for a java.util.concurrent.Callable
Observable.fromFuture() - Allows to create an observable for a java.util.concurrent.Future
Observable.interval() - An observable that emits Long objects in a given interval
Similar methods exists for the other data types,
Unsubscribe to avoid memory leaks
if (subscription != null && !subscription.isDisposed()) {
subscription.dispose();
}
subscription.dispose();
}
It is really a nice post.
ReplyDeleteIf you are looking for best Web Development Services