强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

12 函数式响应式编程

12 函数式响应式编程

“FRP 把事件流当作一等值来处理——你可以像操作数组一样操作事件。”


12.1 概述

函数式响应式编程(Functional Reactive Programming, FRP) 将时间维度引入函数式编程,用流(Stream)和信号(Signal)来表示随时间变化的值。

12.1.1 命令式 vs 响应式

方面命令式响应式
数据流手动更新自动传播
状态管理可变变量不可变流
事件处理回调函数声明式流
并发手动管理操作符处理

12.1.2 FRP 的两大学派

学派代表特点
经典 FRPFran, Elm (早期)连续时间信号/行为
反应式编程RxJS, Reactor离散事件流

12.2 信号与行为(Signals/Behaviors)

12.2.1 经典 FRP 模型

行为(Behavior) 是随时间连续变化的值:

Behavior a = Time → a

事件(Event) 是离散发生的值:

Event a = [(Time, a)]

Elm 风格:

-- 行为:鼠标位置随时间变化
mousePos : Behavior (Int, Int)
mousePos = Mouse.position

-- 事件:鼠标点击
clicks : Event ()
clicks = Mouse.clicks

-- 组合:点击时的鼠标位置
clickPositions : Event (Int, Int)
clickPositions = sampleOn clicks mousePos

-- 转换
countClicks : Behavior Int
countClicks = foldp (\_ acc -> acc + 1) 0 clicks

12.3 响应式流(Reactive Streams)

12.3.1 Observable 模型

Observable 是最流行的响应式流实现。

核心概念:

概念说明
Observable可观察的数据流,发出多个值
Observer消费 Observable 发出的值
Subscription连接 Observable 和 Observer
Operator变换、组合流的函数

12.3.2 RxJS 示例

import { fromEvent, interval, merge, combineLatest } from 'rxjs';
import { map, filter, scan, debounceTime, switchMap, takeUntil } from 'rxjs/operators';

// 基本流
const numbers$ = from([1, 2, 3, 4, 5]);

// 事件流
const clicks$ = fromEvent(document, 'click');
const keys$ = fromEvent(document, 'keydown');

// 变换
const positions$ = clicks$.pipe(
  map(e => ({ x: e.clientX, y: e.clientY }))
);

// 过滤
const enterKeys$ = keys$.pipe(
  filter(e => e.key === 'Enter')
);

// 累积
const clickCount$ = clicks$.pipe(
  scan((count, _) => count + 1, 0)
);

// 防抖
const searchInput$ = fromEvent(searchBox, 'input').pipe(
  debounceTime(300),
  map(e => e.target.value),
  filter(query => query.length > 2)
);

// 搜索请求(取消前一个)
const results$ = searchInput$.pipe(
  switchMap(query => fetch(`/api/search?q=${query}`).then(r => r.json()))
);

// 订阅
results$.subscribe(results => {
  renderResults(results);
});

12.3.3 自动补全示例

// 经典应用:搜索自动补全
const searchBox = document.getElementById('search');
const results = document.getElementById('results');

const search$ = fromEvent(searchBox, 'input').pipe(
  // 取输入值
  map(e => e.target.value.trim()),
  // 防抖 300ms
  debounceTime(300),
  // 至少 2 个字符
  filter(query => query.length >= 2),
  // 去重
  distinctUntilChanged(),
  // 发起请求(取消前一个未完成的)
  switchMap(query =>
    from(fetch(`/api/search?q=${encodeURIComponent(query)}`).then(r => r.json())).pipe(
      // 错误处理:返回空结果
      catchError(() => of([]))
    )
  )
);

search$.subscribe(data => {
  results.innerHTML = data.map(item =>
    `<div class="result">${item.title}</div>`
  ).join('');
});

12.4 流操作符

12.4.1 创建操作符

操作符说明RxJS
of从值创建of(1, 2, 3)
from从数组/可迭代创建from([1, 2, 3])
interval定时创建interval(1000)
fromEvent从事件创建fromEvent(el, 'click')
create自定义创建new Observable(subscriber => {...})

12.4.2 变换操作符

操作符说明
map转换每个值
flatMap / mergeMap展平内部 Observable
switchMap切换到新的内部 Observable(取消旧的)
concatMap顺序执行内部 Observable
scan累积归约
buffer收集值到数组

12.4.3 过滤操作符

操作符说明
filter根据谓词过滤
take取前 N 个值
skip跳过前 N 个值
distinctUntilChanged去重连续相同值
debounceTime防抖
throttleTime节流
first / last取第一个/最后一个

12.4.4 组合操作符

操作符说明
merge合并多个流
concat顺序连接流
combineLatest最新值组合
zip按索引配对
forkJoin等待所有完成
import { merge, combineLatest, zip } from 'rxjs';

// merge:任一流发出值都传播
const merged$ = merge(mouseClicks$, keyPresses$);

// combineLatest:任一流发出时,取其他流的最新值
const fullName$ = combineLatest([firstName$, lastName$]).pipe(
  map(([first, last]) => `${first} ${last}`)
);

// zip:严格配对
const paired$ = zip(names$, ages$).pipe(
  map(([name, age]) => ({ name, age }))
);

12.5 错误处理

import { of, EMPTY } from 'rxjs';
import { catchError, retry, retryWhen, delay } from 'rxjs/operators';

// 基本错误处理
const safeRequest$ = fetch('/api/data').pipe(
  catchError(error => {
    console.error('Request failed:', error);
    return of({ data: [], error: error.message });
  })
);

// 重试
const resilientRequest$ = fetch('/api/data').pipe(
  retry(3)
);

// 指数退避重试
const backoffRequest$ = fetch('/api/data').pipe(
  retryWhen(errors => errors.pipe(
    scan((retryCount, error) => {
      if (retryCount >= 3) throw error;
      return retryCount + 1;
    }, 0),
    delay(retryCount => Math.pow(2, retryCount) * 1000)
  ))
);

// 错误恢复
const requestWithFallback$ = primaryRequest$.pipe(
  catchError(() => fallbackRequest$),
  catchError(() => of(defaultValue))
);

12.6 状态管理

12.6.1 Redux 的流视角

// Redux 本质上是一个流
import { Subject } from 'rxjs';
import { scan, shareReplay } from 'rxjs/operators';

const createStore = (reducer, initialState) => {
  const actions$ = new Subject();
  const state$ = actions$.pipe(
    scan(reducer, initialState),
    shareReplay(1)
  );
  return {
    dispatch: (action) => actions$.next(action),
    select: (selector) => state$.pipe(map(selector)),
    state$,
  };
};

// 使用
const store = createStore(todoReducer, []);

store.select(state => state.filter(t => !t.completed)).subscribe(
  activeTodos => render(activeTodos)
);

store.dispatch({ type: 'ADD_TODO', text: 'Learn FRP' });

12.6.2 Elm Architecture

// Elm Architecture: Model → Update → View
const ElmApp = (init, update, view) => {
  const actions$ = new Subject();

  const model$ = actions$.pipe(
    scan((model, action) => update(model, action), init)
  );

  const vdom$ = model$.pipe(
    map(model => view(model, (action) => actions$.next(action)))
  );

  return { model$, vdom$ };
};

12.7 Haskell 中的 FRP

-- 使用 reflex 库
import Reflex

-- 基本 FRP
example :: Widget t m => m ()
example = do
  -- 创建事件
  rec textInput <- textInput def
      let keypressEvent = _textInput_keypress textInput

  -- 行为:累积按键次数
  count <- foldDyn (+) 0 (1 <$ keypressEvent)

  -- 显示
  display count

-- 动态文本
  dynText $ fmap show count

12.8 业务场景

12.8.1 实时仪表板

// 实时数据仪表板
const dashboard$ = combineLatest({
  cpu: interval(1000).pipe(switchMap(() => fetchMetric('cpu'))),
  memory: interval(2000).pipe(switchMap(() => fetchMetric('memory'))),
  requests: fromEventSource('/api/metrics/requests'),
  errors: fromEventSource('/api/metrics/errors').pipe(
    bufferTime(5000),
    filter(errors => errors.length > 0)
  )
}).pipe(
  // 状态转换
  scan((dashboard, metrics) => ({
    ...dashboard,
    ...metrics,
    lastUpdated: Date.now(),
    alerts: [
      ...dashboard.alerts,
      ...(metrics.cpu > 90 ? ['High CPU'] : []),
      ...(metrics.errors.length > 10 ? ['Error spike'] : []),
    ]
  }), { alerts: [] })
);

dashboard$.subscribe(renderDashboard);

12.8.2 表单验证

const formValidation$ = (form) => {
  const name$ = fromEvent(form.name, 'input').pipe(
    map(e => e.target.value),
    map(name => ({
      value: name,
      valid: name.length >= 2,
      error: name.length < 2 ? 'Name too short' : null
    }))
  );

  const email$ = fromEvent(form.email, 'input').pipe(
    map(e => e.target.value),
    map(email => ({
      value: email,
      valid: /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(email),
      error: !/^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(email) ? 'Invalid email' : null
    }))
  );

  const age$ = fromEvent(form.age, 'input').pipe(
    map(e => parseInt(e.target.value)),
    map(age => ({
      value: age,
      valid: age >= 18 && age <= 120,
      error: age < 18 ? 'Must be 18+' : age > 120 ? 'Invalid age' : null
    }))
  );

  return combineLatest([name$, email$, age$]).pipe(
    map(([name, email, age]) => ({
      fields: { name, email, age },
      valid: name.valid && email.valid && age.valid
    })),
    distinctUntilChanged((a, b) => a.valid === b.valid)
  );
};

12.9 注意事项

注意事项说明
内存泄漏忘记取消订阅会导致内存泄漏
调试困难流的异步性质使调试复杂
过度使用简单场景不需要 FRP
背压处理高速数据流需要背压策略
测试使用 TestScheduler 进行虚拟时间测试

12.10 小结

要点说明
FRP函数式 + 响应式,流是一等公民
Signal/Behavior连续时间变化的值
Observable离散事件流,支持丰富的操作符
操作符创建、变换、过滤、组合、错误处理
状态管理Elm Architecture、Redux 的流视角

扩展阅读

  1. RxJS 文档 — 官方文档
  2. The Introduction to Reactive Programming You’ve Been Missing — André Staltz
  3. Elm Architecture — Elm 官方教程
  4. Reflex FRP — Haskell FRP 库

下一章13 函数式设计模式