Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

深入RxJava之Subject #138

Open
soapgu opened this issue Apr 23, 2022 · 0 comments
Open

深入RxJava之Subject #138

soapgu opened this issue Apr 23, 2022 · 0 comments
Labels
JAVA This doesn't seem right ReactiveX ReactiveX

Comments

@soapgu
Copy link
Owner

soapgu commented Apr 23, 2022

  • Subject概念

在RxJava里面Subject是一个观察者,同时也是可观察者对象,就是个双面功能类,很拗口,只可意会不可言传。
实在读不懂代码多写两遍。
Subject是个特殊的存在,他直接具有“发射”数据的功能。
其实相对其他RX对象,他的存在很好补充了api。一般来说很多Rx对象都是我们create出来的,通过数据,回调,事件,等等。
而subject可以原地“发射”数据。比其他对象更好理解一点
其实从学习角度来说,Subject可以先学,但是个人觉得从使用上Subject靠后,还是以基础类型为主。

先拉清单

  1. Subject
    这个class是整个Subject的抽象类,他是整个Subject家族的课代表。
    这是abstract的,没有具体实现。
    有个特别有意思的事情,onNext是作为Observer收数据的,而Subject作为Observable职能,又是通过onNext来“发射”数据的,看源代码的时候有点精神分裂的感觉

接下来的解释有点简单粗暴,只有看图说话

2.AsyncSubject
图片

只有complete才会收到最后收到的数据

3.BehaviorSubject
图片

订阅瞬间会收到最后一个“缓存”数据,后面所有的发射数据都会收到

  1. PublishSubject

A Subject that emits (multicasts) items to currently subscribed Observers and terminal events to current or late Observers.

关键字:支持多播,实时数据发送

图片
另外看图感受下

我觉得这个类是最实用的

  1. ReplaySubject
    图片

能缓存历史数据,但是容量有限,只适合少量数据

  1. UnicastSubject
    图片

PublishSubject的单播版,如果没有人订阅数据会缓存,只接受一个订阅,多了直接失败。主动停止或者取消订阅结束感觉功能“变弱”,也许特殊场景有效,暂时用不到

  1. CompletableSubject
  2. MaybeSubject
  3. SingleSubject

789一起解释下,就是Completable/Maybe/Single的同款,感觉是鸡肋,我用相关的create或者just也能完成。

光说不练假把式。选择使用PublishSubject作为练习目标,看看能不能达到“实时”发送,多播订阅的目的

package com.soap.testrxsubject;

import androidx.appcompat.app.AppCompatActivity;

import android.os.Bundle;
import android.widget.Button;
import android.widget.TextView;

import java.util.concurrent.TimeUnit;

import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.CompositeDisposable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.subjects.Subject;

public class MainActivity extends AppCompatActivity {
    private Button timerButton;
    private Button sub1Button;
    private Button sub2Button;
    private TextView sub1TextView;
    private TextView sub2TextView;
    private Disposable timerDispose;
    private Disposable sub1Task;
    private Disposable sub2Task;
    private final Subject<String> subject = PublishSubject.create();

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        this.timerButton = findViewById(R.id.timer_button);
        this.sub1Button = findViewById(R.id.sub1_button);
        this.sub2Button = findViewById(R.id.sub2_button);
        this.sub1TextView = findViewById(R.id.sub1_tv);
        this.sub2TextView = findViewById(R.id.sub2_tv);

        timerButton.setOnClickListener( v -> {
            if( timerDispose != null ){
                timerDispose.dispose();
                timerDispose = null;
                timerButton.setText( R.string.start );
            }
            else {
                timerDispose = Observable.interval(1, TimeUnit.SECONDS)
                        .subscribe(t -> subject.onNext(String.format("broadcast:%s", t)));
                timerButton.setText( R.string.end );
            }
        } );

        sub1Button.setOnClickListener( v -> {
            if( sub1Task != null ) {
                if( !sub1Task.isDisposed() ) {
                    sub1Task.dispose();
                }
                sub1Task = null;
                sub1Button.setText( R.string.start );
            }
            else{
                sub1Task = subject.observeOn(AndroidSchedulers.mainThread())
                        .subscribe( t-> this.sub1TextView.setText( t ) );
                sub1Button.setText( R.string.end );
            }
        } );

        sub2Button.setOnClickListener(v->{
            if( sub2Task != null ) {
                if( !sub2Task.isDisposed() ) {
                    sub2Task.dispose();
                }
                sub2Task = null;
                sub2Button.setText( R.string.start );
            }
            else{
                sub2Task = subject.observeOn(AndroidSchedulers.mainThread())
                        .subscribe( t-> this.sub2TextView.setText( t ) );
                sub2Button.setText( R.string.end );
            }
        });
    }
}

代码比较简单

  1. timerButton来控制整个PublishSubject的发布,通过interval实现每秒增加计数加1发送的效果
  2. sub1Button和sub2Button分别控制手工进行订阅,获取PublishSubject的数据并显示出来

实际效果

  • 实时订阅能及时获取最新的计数
  • PublishSubject“总开关”达到目的,两个订阅的数据同步播放和暂停

相关仓库

@soapgu soapgu added JAVA This doesn't seem right ReactiveX ReactiveX labels Apr 23, 2022
soapgu added a commit that referenced this issue Apr 24, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
JAVA This doesn't seem right ReactiveX ReactiveX
Projects
None yet
Development

No branches or pull requests

1 participant