强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

14 函数式并发

14 函数式并发

“函数式编程让并发变得简单——没有共享可变状态,就没有竞态条件。”


14.1 为什么 FP 天然适合并发

函数式编程的不可变性和纯函数特性,消除了并发编程中最常见的问题。

14.1.1 并发问题对照

问题命令式函数式
竞态条件共享可变状态 + 锁不可变数据,无竞争
死锁多个锁的顺序问题无锁/最少锁
数据竞争需要仔细同步纯函数天然安全
一致性需要事务管理引用透明保证一致性

14.2 不可变数据与并发

14.2.1 共享不可变数据

// 不可变数据可以安全地在多线程间共享
const config = Object.freeze({
  apiUrl: 'https://api.example.com',
  timeout: 5000,
  retries: 3,
});

// Web Worker 可以安全读取(结构化克隆)
const worker = new Worker('worker.js');
worker.postMessage(config);
// Rust 的 Send + Sync trait
use std::sync::Arc;
use std::thread;

let data = Arc::new(vec![1, 2, 3, 4, 5]);

let handles: Vec<_> = (0..4).map(|i| {
    let data = Arc::clone(&data);
    thread::spawn(move || {
        // 不可变数据,安全并发访问
        let sum: i32 = data.iter().sum();
        println!("Thread {}: sum = {}", i, sum);
    })
}).collect();

for h in handles { h.join().unwrap(); }

14.3 Actor 模型

Actor 模型将计算抽象为 Actor——每个 Actor 拥有私有状态,通过消息传递通信。

14.3.1 Actor 特性

特性说明
封装每个 Actor 有私有状态,不共享
消息传递Actor 之间通过异步消息通信
位置透明Actor 可以在本地或远程
容错Supervisor 监控子 Actor 的生命周期

14.3.2 Erlang/Elixir 示例

%% Erlang Actor(Process)
-module(counter).
-export([start/0, increment/1, get/1, loop/1]).

start() -> spawn(fun() -> loop(0) end).

increment(Pid) -> Pid ! increment.
get(Pid) ->
    Pid ! {self(), get},
    receive
        {Pid, Count} -> Count
    end.

loop(Count) ->
    receive
        increment -> loop(Count + 1);
        {From, get} ->
            From ! {self(), Count},
            loop(Count)
    end.
# Elixir GenServer
defmodule Counter do
  use GenServer

  # Client API
  def start_link(initial_count \\ 0) do
    GenServer.start_link(__MODULE__, initial_count)
  end

  def increment(pid) do
    GenServer.cast(pid, :increment)
  end

  def get(pid) do
    GenServer.call(pid, :get)
  end

  # Server callbacks
  @impl true
  def init(count), do: {:ok, count}

  @impl true
  def handle_cast(:increment, count) do
    {:noreply, count + 1}
  end

  @impl true
  def handle_call(:get, _from, count) do
    {:reply, count, count}
  end
end

# 使用
{:ok, counter} = Counter.start_link(0)
Counter.increment(counter)
Counter.increment(counter)
Counter.get(counter)  # 2

14.3.3 JavaScript Actor 模拟

// 简化的 Actor 系统
const createActor = (behavior) => {
  const mailbox = [];
  let processing = false;

  const self = {
    send: (message) => {
      mailbox.push(message);
      if (!processing) processMailbox();
    }
  };

  async function processMailbox() {
    processing = true;
    while (mailbox.length > 0) {
      const message = mailbox.shift();
      await behavior(message, self);
    }
    processing = false;
  }

  return self;
};

// 使用
const counterActor = createActor(async (msg, self) => {
  if (msg.type === 'INCREMENT') {
    self.state = (self.state || 0) + 1;
  } else if (msg.type === 'GET') {
    msg.replyTo.send({ type: 'COUNT', value: self.state || 0 });
  }
});

counterActor.send({ type: 'INCREMENT' });
counterActor.send({ type: 'INCREMENT' });
counterActor.send({ type: 'GET', replyTo: printerActor });

14.4 STM(Software Transactional Memory)

STM 提供了一种事务性的内存访问机制,类似数据库事务但用于内存数据。

14.4.1 Haskell STM

import Control.Concurrent.STM

-- STM 变量
type Account = TVar Int

transfer :: Account -> Account -> Int -> STM ()
transfer from to amount = do
  balance <- readTVar from
  when (balance < amount) retry  -- 自动重试
  writeTVar from (balance - amount)
  modifyTVar to (+ amount)

-- 使用
main :: IO ()
main = do
  alice <- newTVarIO 1000
  bob   <- newTVarIO 500

  -- 原子事务
  atomically $ transfer alice bob 300

  -- 读取(也在事务中)
  (a, b) <- atomically $ do
    a <- readTVar alice
    b <- readTVar bob
    return (a, b)

  putStrLn $ "Alice: " ++ show a ++ ", Bob: " ++ show b
  -- Alice: 700, Bob: 800

14.4.2 Clojure STM

;; Clojure 的 STM
(def account-a (ref 1000))
(def account-b (ref 500))

(defn transfer [from to amount]
  (dosync
    (let [balance @from]
      (when (< balance amount)
        (throw (Exception. "Insufficient funds")))
      (alter from - amount)
      (alter to + amount))))

(transfer account-a account-b 300)

@account-a  ;; 700
@account-b  ;; 800

14.5 CSP(Communicating Sequential Processes)

CSP 模型使用通道(Channel)进行协程间通信。

14.5.1 Go 的 CSP

package main

import "fmt"

func producer(ch chan<- int) {
    for i := 0; i < 10; i++ {
        ch <- i
    }
    close(ch)
}

func consumer(ch <-chan int, done chan<- struct{}) {
    for v := range ch {
        fmt.Printf("Received: %d\n", v)
    }
    done <- struct{}{}
}

func main() {
    ch := make(chan int)
    done := make(chan struct{})

    go producer(ch)
    go consumer(ch, done)

    <-done  // 等待完成
}

14.5.2 Clojure core.async

(require '[clojure.core.async :as async :refer [go chan <! >! <!! >!!]])

;; 创建通道
(def ch (chan 10))

;; 生产者(go 块)
(go
  (doseq [i (range 10)]
    (>! ch i))
  (async/close! ch))

;; 消费者
(go
  (loop []
    (when-let [v (<! ch)]
      (println "Received:" v)
      (recur))))

;; 管道
(def input (chan))
(def output (chan))

(async/pipe input output)

(go (>! input 42))
(<!! output)  ;; 42

14.5.3 JavaScript 中的 CSP

// 使用 channels 库
const { chan, go, put, take, alts } = require('js-csp');

const ch = chan();

// 生产者
go(function*() {
  for (let i = 0; i < 10; i++) {
    yield put(ch, i);
  }
  ch.close();
});

// 消费者
go(function*() {
  let value;
  while ((value = yield take(ch)) !== undefined) {
    console.log('Received:', value);
  }
});

14.6 并行计算

14.6.1 Haskell 并行策略

import Control.Parallel.Strategies

-- par 和 pseq
parMap :: (a -> b) -> [a] -> [b]
parMap f xs = map f xs `using` parList rseq

-- 并行求值
main :: IO ()
main = do
  let xs = [1..10000000] :: [Int]
  -- 并行计算
  let result = sum (map (^2) xs) `using` parList rdeepseq
  print result

-- 并行策略
parList :: Strategy a -> Strategy [a]
parList strat [] = return []
parList strat (x:xs) = do
  x' <- rpar (x `using` strat)
  xs' <- parList strat xs
  return (x':xs')

14.6.2 Rust 并行迭代器

use rayon::prelude::*;

fn main() {
    let data: Vec<i32> = (1..=10_000_000).collect();

    // 顺序
    let sum1: i64 = data.iter().map(|&x| (x as i64) * (x as i64)).sum();

    // 并行(只需改为 par_iter)
    let sum2: i64 = data.par_iter().map(|&x| (x as i64) * (x as i64)).sum();

    assert_eq!(sum1, sum2);

    // 并行排序
    let mut data = vec![5, 3, 1, 4, 2];
    data.par_sort();
}

14.6.3 Python 并行

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from functools import partial

# 并行 map
def parallel_map(fn, items, max_workers=4):
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        return list(executor.map(fn, items))

# 使用
def heavy_computation(x):
    return sum(i * i for i in range(x))

results = parallel_map(heavy_computation, [1000000] * 8)

14.7 异步编程

14.7.1 函数式异步

// Task Monad(类似 Promise 但更函数式)
class Task {
  constructor(fork) {
    this.fork = fork;
  }

  static of(value) {
    return new Task((reject, resolve) => resolve(value));
  }

  static rejected(error) {
    return new Task((reject, resolve) => reject(error));
  }

  map(fn) {
    return new Task((reject, resolve) =>
      this.fork(reject, (value) => resolve(fn(value)))
    );
  }

  flatMap(fn) {
    return new Task((reject, resolve) =>
      this.fork(reject, (value) => fn(value).fork(reject, resolve))
    );
  }

  static fromPromise(promise) {
    return new Task((reject, resolve) =>
      promise.then(resolve).catch(reject)
    );
  }
}

// 使用
const fetchUser = (id) =>
  Task.fromPromise(fetch(`/api/users/${id}`).then(r => r.json()));

const fetchPosts = (userId) =>
  Task.fromPromise(fetch(`/api/users/${userId}/posts`).then(r => r.json()));

const program = fetchUser(1)
  .flatMap(user => fetchPosts(user.id)
    .map(posts => ({ user, posts })));

program.fork(
  error => console.error(error),
  result => console.log(result)
);

14.8 业务场景

14.8.1 微服务并发调用

// 并发调用多个微服务
const fetchAllServices = async (userId) => {
  const [user, orders, recommendations] = await Promise.all([
    fetch(`/api/users/${userId}`).then(r => r.json()),
    fetch(`/api/orders?userId=${userId}`).then(r => r.json()),
    fetch(`/api/recommendations/${userId}`).then(r => r.json()),
  ]);

  return { user, orders, recommendations };
};

// 有限并发
const parallelLimit = (tasks, limit) => {
  const results = [];
  let index = 0;

  const worker = async () => {
    while (index < tasks.length) {
      const i = index++;
      results[i] = await tasks[i]();
    }
  };

  return Promise.all(
    Array.from({ length: limit }, () => worker())
  ).then(() => results);
};

14.8.2 消息队列处理

import asyncio
from asyncio import Queue

async def producer(queue: Queue, items):
    for item in items:
        await queue.put(item)
        await asyncio.sleep(0.1)  # 模拟生产延迟
    await queue.put(None)  # 哨兵值

async def consumer(queue: Queue, name: str):
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"{name} processing: {item}")
        await asyncio.sleep(0.2)  # 模拟处理
    print(f"{name} done")

async def main():
    queue = asyncio.Queue(maxsize=10)

    items = list(range(20))
    producers = [asyncio.create_task(producer(queue, items))]
    consumers = [
        asyncio.create_task(consumer(queue, f"Worker-{i}"))
        for i in range(3)
    ]

    await asyncio.gather(*producers)
    await asyncio.gather(*consumers)

asyncio.run(main())

14.9 注意事项

注意事项说明
死锁预防Actor 模型和 CSP 避免传统死锁
背压处理高速生产者需要背压机制
错误隔离使用 Supervisor 监控并发任务
共享状态尽量减少共享状态,使用消息传递
性能调优并发数、队列大小需要根据场景调优

14.10 小结

要点说明
FP 优势不可变性天然适合并发
Actor 模型封装状态 + 消息传递(Erlang/Elixir)
STM事务性内存,原子性保证
CSP通道通信(Go, Clojure core.async)
并行计算par_iter, parMap 等并行化工具

扩展阅读

  1. Programming Erlang — Joe Armstrong
  2. Clojure for the Brave and True - Ch 13 — 并发章节
  3. Software Transactional Memory — Microsoft Research
  4. Rayon - Rust 并行迭代器

下一章15 函数式错误处理