Rxjava Part1

RxJava是响应式程序设计的一种实现。在响应式程序设计中,当数据到达的时候,消费者做出响应。响应式编程可以将事件传递给注册了的observer。这是基于RxJava1.x的。

开始

可以使用Observable.create()创建一个Observable。需要传入一个继承自Action1的OnSubscribe对象。当观察者订阅可以使用Observable的时候,他作为一个参数传入并执行call方法。

@Test
   public void test() {
       Observable<Integer> integerObservable = Observable.create(subscriber -> {
           for (int i = 0; i < 5; i++) {
               subscriber.onNext(i);
                    System.out.println("ok this is " + i);
           }
       });


       Subscription subscription = integerObservable.subscribe(new Observer<Integer>() {
           @Override
           public void onCompleted() {
               System.out.println("onCompleted");
           }

           @Override
           public void onError(Throwable e) {
               System.out.println("onError");
           }

           @Override
           public void onNext(Integer integer) {
               System.out.println("Item is " +integer);
           }
       });

   }

打印的数据为:

ok this is 0
Item is 0
ok this is 1
Item is 1
ok this is 2
Item is 2
ok this is 3
Item is 3
ok this is 4
Item is 4
onCompleted

integerObservable执行了一个循环不断发射整数,订阅了integerObservable之后返回的一个subscription就开始不断接收这些整数,并执行在onNext()中相印的操作。

我们也使用Observable.from()从一个列表或者数组创建Observable.

Observable<Integer> observable = Observable.from(List.of(1, 3, 4, 5));//这里创建集合的方式是java9的新特性

如果想把一个方法或者对象甚至一个字符串转换成Observable需要怎么做呢?可以使用Observable.just(),just()也方法有九个参数,他会按照掺入顺序一次发射,但如果参数中有数组或者列表,他不会像from()那样迭代列表发射每个值。

@Test
    public void test() {
        Observable<Person> personObservable = Observable.just(new Person(18, "熊禹"), new Person(19, "熊禹"));
        personObservable.subscribe(new Subscriber<Person>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onNext(Person person) {
                System.out.println("onNext"+person);
            }
        });


        Observable.just(List.of(1,2,3,4,5)).subscribe(new Subscriber<List<Integer>>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onNext(List<Integer> integers) {
                System.out.println("onNext:"+integers);
            }
        });
    }

打印结果为:

onNextPerson{age=18, name='熊禹'}
onNextPerson{age=19, name='熊禹'}
onCompleted
onNext:[1, 2, 3, 4, 5]
onCompleted

通常,可以使用Observable.just()来发送已经定义好的布局有时变性的数据。

Subject

Subject有两种用途:
- 做为observable向其他的observable发送事件
- 做为observer接收其他的observable发送的事件。

PublishSubject


@Test public void test() { PublishSubject<String> publishSubject = PublishSubject.create(); publishSubject.subscribe(new Subscriber<String>() { @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable e) { System.out.println("onError"); } @Override public void onNext(String s) { System.out.println("onNext" + s); } }); publishSubject.onNext("Talk is cheap,Show me the code"); publishSubject.onNext("Talk is cheap,Show me the code2"); publishSubject.onCompleted(); publishSubject.onNext("Talk is cheap,Show me the code3"); }

下面是一个可以连接Observable同时也可以被观测的实体

@Test
 public void test() {

     PublishSubject<Boolean> publishSubject = PublishSubject.create();
     publishSubject.subscribe(new Observer<Boolean>() {
         @Override
         public void onCompleted() {

         }

         @Override
         public void onError(Throwable e) {

         }

         @Override
         public void onNext(Boolean aBoolean) {
             System.out.println("onNext:" + aBoolean);
         }
     });


     Observable.create((Observable.OnSubscribe<Integer>) subscriber -> {
         for (int i = 0; i < 10; i++) {

             subscriber.onNext(i);
         }
         subscriber.onCompleted();
     }).doOnCompleted(new Action0() {//执行onCompleted() 之后执行的操作:
         @Override
         public void call() {
             publishSubject.onNext(true);
         }
     }).subscribe();//空的subscribe为了开启Observable
 }

Observable结束的时候,发射一个true。这个时候publishSubject的onNext()得到执行。

BehaviorSubject

BehaviorSubject会默认向他的订阅者发送一个默认值,然后正常发送订阅后的数据

    @Test
    public void test() {
        BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(1);//BehaviorSubject会默认向他的订阅者发送一个默认值,然后正常发送订阅后的数据
        behaviorSubject.subscribe(new Observer<Integer>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println("onNext:"+integer);
            }
        });

        behaviorSubject.onNext(2);
        behaviorSubject.onNext(3);
        behaviorSubject.onNext(4);
        behaviorSubject.onNext(5);

        //打印结果;
        //onNext:1 //这个就是默认值
        //onNext:2
        //onNext:3
        //onNext:4
        //onNext:5

ReplaySubject

ReplaySubject会缓存他所订阅的所有数据

@Test
  public void test()
  {

      ReplaySubject<Integer> replaySubject=ReplaySubject.create();
      replaySubject.subscribe(x -> System.out.println("first:"+x));
      replaySubject.onNext(1);
      replaySubject.onNext(3);

      replaySubject.subscribe(x -> System.out.println("second:"+x));//a
      replaySubject.onNext(4);

//        first:1
//        first:3
//        second:1
//        second:3
//        first:4
//        second:4 因为没有执行onCComplete,所以在a处执行之后知情的订阅依旧会获得消息。

  }

AsyncSubject

AsyncSubject只会发布最后一个数据给他的订阅者

@Test
   public void test() {
       AsyncSubject<Integer> asyncSubject = AsyncSubject.create();

       asyncSubject.subscribe(System.out::println);
       asyncSubject.subscribe(new Observer<Integer>() {
           @Override
           public void onCompleted() {

           }

           @Override
           public void onError(Throwable e) {

           }

           @Override
           public void onNext(Integer integer) {
               System.out.println("onNext:" + integer);
           }
       });
       asyncSubject.onNext(1);
       asyncSubject.onNext(2);
       asyncSubject.onNext(3);
       asyncSubject.onNext(4);
       asyncSubject.onCompleted();//因为是要发送最新一条数据,所以必须要在onCompleted()之心个之后才会有结果

   }

输出结果:

4
onNext:4

One thought on “Rxjava Part1

发表评论

电子邮件地址不会被公开。 必填项已用*标注