Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rxjs那些事儿 #256

Open
FrankKai opened this issue Apr 13, 2022 · 11 comments
Open

Rxjs那些事儿 #256

FrankKai opened this issue Apr 13, 2022 · 11 comments

Comments

@FrankKai
Copy link
Owner

FrankKai commented Apr 13, 2022

  • of
  • from
  • map
  • timer
  • interval
  • Observable
  • Subject
  • BehaviorSubject(与Subject区别、在medusa中的应用)
  • Rxjs流最简流程(创建流、处理流、启动流、停止流)
  • Rxjs在React框架中使用
  • Rxjs方式改写react-request-queue
@FrankKai
Copy link
Owner Author

of

将参数转化为一个可观察序列。

每个参数都会作为下一个通知。
image

示例

发射数据10,20,30

import { of } from 'rxjs';
of(10,20,30).subscribe(
 next => console.log('next:', next),
  err => console.log('error:', err),
  () => console.log('the end'),
)

发射数组[]10,20,30]

import { of } from 'rxjs';
 
of([1, 2, 3])
.subscribe(
  next => console.log('next:', next),
  err => console.log('error:', err),
  () => console.log('the end'),
);
 
// Outputs
// next: [1, 2, 3]
// the end

@FrankKai
Copy link
Owner Author

from

将数组,类数组,Promise,可迭代对象等等,转化为Observable。
一句话总结:将万物转化为Observable
image

例子

同步流

import { from } from 'rxjs';

const array = [10, 20, 30];
const result = from(array);

console.log('start');

// 同步订阅
result.subscribe((x) => console.log(x));

console.log('end');

// Logs:
// 10
// 20
// 30

异步流

import { from, asyncScheduler } from 'rxjs';
 
console.log('start');
 
const array = [10, 20, 30];
const result = from(array, asyncScheduler);
 
result.subscribe(x => console.log(x));
 
console.log('end');
 
// Logs:
// start
// end
// 10
// 20
// 30

@FrankKai
Copy link
Owner Author

map

对observable发出的值,都经过map函数的处理,并且作为结果返回。
image

对每个值都乘以10

import { of, map } from 'rxjs';

of(1, 2, 3)
  .pipe(map((num) => num * 10))
  .subscribe(
    (next) => console.log('next:', next),
    (err) => console.log('error:', err),
    () => console.log('the end')
  );
// next:10
// next:20
// next:30
// end

@FrankKai
Copy link
Owner Author

timer

timer observable很适合在代码中创建延时,或者是与其他特定的值做race。
delay默认情况下是毫秒。

每隔2s,执行一次回调函数(0~2秒内不触发)

import { timer } from 'rxjs';

timer(0, 2000).subscribe((n) => console.log('timer', n));

@FrankKai
Copy link
Owner Author

interval

可用于准时返回递增数字。
image

例子

每秒+1,加到3秒

import { interval } from 'rxjs';
import { take } from 'rxjs/operators';
 
const numbers = interval(1000);
 
const takeFourNumbers = numbers.pipe(take(4));
 
takeFourNumbers.subscribe(x => console.log('Next: ', x));
 
// Logs:
// Next: 0
// Next: 1
// Next: 2
// Next: 3

每隔2s,执行一次回调函数(0~2秒内触发)

import { interval } from 'rxjs';

interval(2000).subscribe((n) => console.log('timer', n));

@FrankKai
Copy link
Owner Author

Observable

通过Observable创建一个流,并且通过subscriber.next控制输出。

import { Observable } from 'rxjs';

const stream$ = new Observable((subscriber) => {
  setTimeout(() => {
    subscriber.next([1, 2, 3]);
  }, 500);
  setTimeout(() => {
    subscriber.next({ a: 1000 });
  }, 1000);
  setTimeout(() => {
    subscriber.next('end');
  }, 3000);
  setTimeout(() => {
    subscriber.complete();
  }, 4000);
});

// 启动流
stream$.subscribe({
  complete: () => console.log('done'),
  next: (v) => console.log(v),
  error: () => console.log('error'),
});

@FrankKai
Copy link
Owner Author

FrankKai commented Apr 13, 2022

Subject

What is a Subject? An RxJS Subject is a special type of Observable that allows values to be multicasted to many Observer

是一种特殊类型的Observable,可以将消息同时广播给多个Observer,也就是多播。
而普通Observable则对应一个Observer,是单播。
可以理解为最简发布订阅。

import { Subject } from 'rxjs';

// 创建subject
const subject = new Subject();

// 订阅一个observer
subject.subscribe((v) => console.log('stream 1', v));
// 再订阅一个observer
subject.subscribe((v) => console.log('stream 2', v));
// 延时1s再订阅一个observer
setTimeout(() => {
  subject.subscribe((v) => console.log('stream 3', v));
}, 1000);
// 产生数据1
subject.next(1);
// 产生数据2
subject.next(2);
// 延时3s产生数据3
setTimeout(() => {
  subject.next(3);
}, 3000);
// output
// stream 1 1 //立即输出
// stream 2 1 //立即输出
// stream 1 2 //立即输出
// stream 2 2 //立即输出
// stream 1 3 //3s后输出
// stream 2 3 //3s后输出
// stream 3 3 //3s后输出

@FrankKai
Copy link
Owner Author

BehaviorSubject(与Subject区别、在medusa中的应用)

Subject的变体,需要初始值,并在订阅时发出其当前值。

注意:与Subject不同的是,需要初始值,而且subject.next会在所有订阅函数中执行(无论前后位置)。

import { BehaviorSubject } from 'rxjs';

const subject = new BehaviorSubject(123);

subject.subscribe(console.log);
subject.subscribe(console.log);

subject.next(456);

subject.subscribe(console.log);

subject.next(789);

// 123
// 123
// 456
// 456
// 456
// 789
// 789
// 789

Subject只会被位于subject.next之前订阅函数处理。

const subject = new Subject();

subject.subscribe(console.log);
subject.subscribe(console.log);

subject.next(101112); // 注意这里,只打印了2次

subject.subscribe(console.log);

subject.next(131415);
// 101112
// 101112
// 131415
// 131415
// 131415

tuya的微前端框架medusa,有用到BehaviorSubject这个类,用于主子应用做通信。

源码地址:https://github.com/tuya/medusa/blob/main/packages/medusa/src/packages/store.ts

发送数据

import { dispatch } from 'mmed/client';
dispatch("Hello World", false, 'foo')

接收数据

import { subscribe } from 'mmed/client';

useEffect(() => {
  const stream$ = subscribe((data) => {
    console.log(data) // Hello World
  }, 'foo');
  return () => {
    stream$.unsubscribe();
  };
}, [])

@FrankKai
Copy link
Owner Author

FrankKai commented Apr 13, 2022

Rxjs流最简流程(创建流、处理流、启动流、停止流)

分为4步:

  1. 创建流
  2. 处理流(或者说消费流)
  3. 启动流
  4. 停止流
import { Observable } from "rxjs";

// 创建流 - Observable
const stream$ = new Observable(subscriber => {
  setTimeout(() => {
    subscriber.next([1, 2, 3]);
  }, 500);
});

// 处理流(消费流产生的数据)- Observer 
const observer = {
  complete: () => console.log("done"),
  next: v => console.log(v),
  error: () => console.log("error")
};

// 启动流 - Subscription
const subscription = stream$.subscribe(observer);

setTimeout(() => {
  // 停止流
  subscription.unsubscribe();
}, 1000);

@FrankKai
Copy link
Owner Author

Rxjs在React框架中使用

这是一个例子,模拟了如何使用Rxjs,改造 前端请求接口后返回的数据,与prop数组做拼接 这种场景。

传统方式

import * as React from 'react';

const GreetSomeone = ({ greet = 'Hello' }) => {
  const [greeting, setGreeting] = React.useState('');
  const [name, setName] = React.useState('');

  React.useEffect(() => {
    setTimeout(() => {
      setName('World');
    }, 3000);
  }, []);

  React.useEffect(() => {
    setGreeting(`${greet}, ${name}!`);
  }, [greet, name]);

  return <p>{greeting}</p>;
};

export default GreetSomeone;

rxjs方式

import * as React from 'react';
import { combineLatest, from, of, Observable, BehaviorSubject } from 'rxjs';
import { catchError, map, startWith } from 'rxjs/operators';

const GreetSomeone = ({ greet = 'Hello' }) => {
  const [greeting, setGreeting] = React.useState('');
  // 创建greet流
  const greet$ = React.useRef(new BehaviorSubject(greet));

  React.useEffect(() => {
    greet$.current.next(greet);
  }, [greet]);

  React.useEffect(() => {
    console.log('创建流');
    // 创建name流
    const name$ = from(
      // 模拟远程搜索数据
      new Observable((subscriber) => {
        setTimeout(() => {
          subscriber.next('World');
        }, 3000);
      })
    ).pipe(
      startWith('_____'),
      catchError(() => of('Mololongo'))
    );
    // 创建greet和name合并流
    const greeting$ = combineLatest(greet$.current, name$).pipe(
      map(([greet, name]) => `${greet}, ${name}!`)
    );

    // 启动合并流
    const subscription = greeting$.subscribe((value) => {
      setGreeting(value);
    });

    return () => {
      subscription.unsubscribe();
    };
  }, []);

  return <p>{greeting}</p>;
};

export default GreetSomeone;

demo地址:https://stackblitz.com/edit/react-ts-jmk8wd?file=Hello.tsx

@FrankKai
Copy link
Owner Author

FrankKai commented Oct 23, 2024

Rxjs方式改写react-request-queue

concatMap 特别适用于当你需要确保异步任务按顺序执行,并且下一个任务需要等待上一个任务完成后再执行的场景。例如,发起一系列需要按顺序完成的 HTTP 请求。
队列方式

import { useRef, useEffect } from "react";

type RequestParams = any;
type RequestFunction = (params: RequestParams) => Promise<void>;

const useRequestQueue = (
  requestFunction: RequestFunction,
  dependencies: any[]
) => {
  const isRequestInProgress = useRef(false);
  const requestQueue = useRef<{ params: RequestParams }[]>([]);

  const processQueue = async () => {
    if (requestQueue.current.length === 0 || isRequestInProgress.current) {
      return;
    }

    isRequestInProgress.current = true;
    const { params } = requestQueue.current.shift();

    try {
      await requestFunction(params);
    } catch (error) {
      console.error("Error processing request:", error);
    } finally {
      isRequestInProgress.current = false;
      processQueue();
    }
  };

  useEffect(() => {
    processQueue();
  }, dependencies);

  const addToQueue = (params: RequestParams) => {
    requestQueue.current.push({ params });
    processQueue();
  };

  return { addToQueue };
};

export default useRequestQueue;

rxjs方式改写

import { useEffect } from "react";
import { Subject } from "rxjs";
import { concatMap } from "rxjs/operators";

type RequestParams = any;
type RequestFunction = (params: RequestParams) => Promise<void>;

const useRequestQueueWithRxJS = (
  requestFunction: RequestFunction,
  dependencies: any[]
) => {
  const requestQueue$ = new Subject<RequestParams>();

  useEffect(() => {
    const subscription = requestQueue$
      .pipe(
        // 使用 concatMap 来确保请求按顺序执行
        concatMap((params) => requestFunction(params))
      )
      .subscribe({
        error: (error) => console.error("Error processing request:", error),
      });

    return () => {
      subscription.unsubscribe();
    };
  }, dependencies);

  const addToQueue = (params: RequestParams) => {
    requestQueue$.next(params);
  };

  return { addToQueue };
};

export default useRequestQueueWithRxJS;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant