14 函数式并发
14 函数式并发
“函数式编程让并发变得简单——没有共享可变状态,就没有竞态条件。”
14.1 为什么 FP 天然适合并发
函数式编程的不可变性和纯函数特性,消除了并发编程中最常见的问题。
14.1.1 并发问题对照
| 问题 | 命令式 | 函数式 |
|---|---|---|
| 竞态条件 | 共享可变状态 + 锁 | 不可变数据,无竞争 |
| 死锁 | 多个锁的顺序问题 | 无锁/最少锁 |
| 数据竞争 | 需要仔细同步 | 纯函数天然安全 |
| 一致性 | 需要事务管理 | 引用透明保证一致性 |
14.2 不可变数据与并发
14.2.1 共享不可变数据
// 不可变数据可以安全地在多线程间共享
const config = Object.freeze({
apiUrl: 'https://api.example.com',
timeout: 5000,
retries: 3,
});
// Web Worker 可以安全读取(结构化克隆)
const worker = new Worker('worker.js');
worker.postMessage(config);
// Rust 的 Send + Sync trait
use std::sync::Arc;
use std::thread;
let data = Arc::new(vec![1, 2, 3, 4, 5]);
let handles: Vec<_> = (0..4).map(|i| {
let data = Arc::clone(&data);
thread::spawn(move || {
// 不可变数据,安全并发访问
let sum: i32 = data.iter().sum();
println!("Thread {}: sum = {}", i, sum);
})
}).collect();
for h in handles { h.join().unwrap(); }
14.3 Actor 模型
Actor 模型将计算抽象为 Actor——每个 Actor 拥有私有状态,通过消息传递通信。
14.3.1 Actor 特性
| 特性 | 说明 |
|---|---|
| 封装 | 每个 Actor 有私有状态,不共享 |
| 消息传递 | Actor 之间通过异步消息通信 |
| 位置透明 | Actor 可以在本地或远程 |
| 容错 | Supervisor 监控子 Actor 的生命周期 |
14.3.2 Erlang/Elixir 示例
%% Erlang Actor(Process)
-module(counter).
-export([start/0, increment/1, get/1, loop/1]).
start() -> spawn(fun() -> loop(0) end).
increment(Pid) -> Pid ! increment.
get(Pid) ->
Pid ! {self(), get},
receive
{Pid, Count} -> Count
end.
loop(Count) ->
receive
increment -> loop(Count + 1);
{From, get} ->
From ! {self(), Count},
loop(Count)
end.
# Elixir GenServer
defmodule Counter do
use GenServer
# Client API
def start_link(initial_count \\ 0) do
GenServer.start_link(__MODULE__, initial_count)
end
def increment(pid) do
GenServer.cast(pid, :increment)
end
def get(pid) do
GenServer.call(pid, :get)
end
# Server callbacks
@impl true
def init(count), do: {:ok, count}
@impl true
def handle_cast(:increment, count) do
{:noreply, count + 1}
end
@impl true
def handle_call(:get, _from, count) do
{:reply, count, count}
end
end
# 使用
{:ok, counter} = Counter.start_link(0)
Counter.increment(counter)
Counter.increment(counter)
Counter.get(counter) # 2
14.3.3 JavaScript Actor 模拟
// 简化的 Actor 系统
const createActor = (behavior) => {
const mailbox = [];
let processing = false;
const self = {
send: (message) => {
mailbox.push(message);
if (!processing) processMailbox();
}
};
async function processMailbox() {
processing = true;
while (mailbox.length > 0) {
const message = mailbox.shift();
await behavior(message, self);
}
processing = false;
}
return self;
};
// 使用
const counterActor = createActor(async (msg, self) => {
if (msg.type === 'INCREMENT') {
self.state = (self.state || 0) + 1;
} else if (msg.type === 'GET') {
msg.replyTo.send({ type: 'COUNT', value: self.state || 0 });
}
});
counterActor.send({ type: 'INCREMENT' });
counterActor.send({ type: 'INCREMENT' });
counterActor.send({ type: 'GET', replyTo: printerActor });
14.4 STM(Software Transactional Memory)
STM 提供了一种事务性的内存访问机制,类似数据库事务但用于内存数据。
14.4.1 Haskell STM
import Control.Concurrent.STM
-- STM 变量
type Account = TVar Int
transfer :: Account -> Account -> Int -> STM ()
transfer from to amount = do
balance <- readTVar from
when (balance < amount) retry -- 自动重试
writeTVar from (balance - amount)
modifyTVar to (+ amount)
-- 使用
main :: IO ()
main = do
alice <- newTVarIO 1000
bob <- newTVarIO 500
-- 原子事务
atomically $ transfer alice bob 300
-- 读取(也在事务中)
(a, b) <- atomically $ do
a <- readTVar alice
b <- readTVar bob
return (a, b)
putStrLn $ "Alice: " ++ show a ++ ", Bob: " ++ show b
-- Alice: 700, Bob: 800
14.4.2 Clojure STM
;; Clojure 的 STM
(def account-a (ref 1000))
(def account-b (ref 500))
(defn transfer [from to amount]
(dosync
(let [balance @from]
(when (< balance amount)
(throw (Exception. "Insufficient funds")))
(alter from - amount)
(alter to + amount))))
(transfer account-a account-b 300)
@account-a ;; 700
@account-b ;; 800
14.5 CSP(Communicating Sequential Processes)
CSP 模型使用通道(Channel)进行协程间通信。
14.5.1 Go 的 CSP
package main
import "fmt"
func producer(ch chan<- int) {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
}
func consumer(ch <-chan int, done chan<- struct{}) {
for v := range ch {
fmt.Printf("Received: %d\n", v)
}
done <- struct{}{}
}
func main() {
ch := make(chan int)
done := make(chan struct{})
go producer(ch)
go consumer(ch, done)
<-done // 等待完成
}
14.5.2 Clojure core.async
(require '[clojure.core.async :as async :refer [go chan <! >! <!! >!!]])
;; 创建通道
(def ch (chan 10))
;; 生产者(go 块)
(go
(doseq [i (range 10)]
(>! ch i))
(async/close! ch))
;; 消费者
(go
(loop []
(when-let [v (<! ch)]
(println "Received:" v)
(recur))))
;; 管道
(def input (chan))
(def output (chan))
(async/pipe input output)
(go (>! input 42))
(<!! output) ;; 42
14.5.3 JavaScript 中的 CSP
// 使用 channels 库
const { chan, go, put, take, alts } = require('js-csp');
const ch = chan();
// 生产者
go(function*() {
for (let i = 0; i < 10; i++) {
yield put(ch, i);
}
ch.close();
});
// 消费者
go(function*() {
let value;
while ((value = yield take(ch)) !== undefined) {
console.log('Received:', value);
}
});
14.6 并行计算
14.6.1 Haskell 并行策略
import Control.Parallel.Strategies
-- par 和 pseq
parMap :: (a -> b) -> [a] -> [b]
parMap f xs = map f xs `using` parList rseq
-- 并行求值
main :: IO ()
main = do
let xs = [1..10000000] :: [Int]
-- 并行计算
let result = sum (map (^2) xs) `using` parList rdeepseq
print result
-- 并行策略
parList :: Strategy a -> Strategy [a]
parList strat [] = return []
parList strat (x:xs) = do
x' <- rpar (x `using` strat)
xs' <- parList strat xs
return (x':xs')
14.6.2 Rust 并行迭代器
use rayon::prelude::*;
fn main() {
let data: Vec<i32> = (1..=10_000_000).collect();
// 顺序
let sum1: i64 = data.iter().map(|&x| (x as i64) * (x as i64)).sum();
// 并行(只需改为 par_iter)
let sum2: i64 = data.par_iter().map(|&x| (x as i64) * (x as i64)).sum();
assert_eq!(sum1, sum2);
// 并行排序
let mut data = vec![5, 3, 1, 4, 2];
data.par_sort();
}
14.6.3 Python 并行
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from functools import partial
# 并行 map
def parallel_map(fn, items, max_workers=4):
with ProcessPoolExecutor(max_workers=max_workers) as executor:
return list(executor.map(fn, items))
# 使用
def heavy_computation(x):
return sum(i * i for i in range(x))
results = parallel_map(heavy_computation, [1000000] * 8)
14.7 异步编程
14.7.1 函数式异步
// Task Monad(类似 Promise 但更函数式)
class Task {
constructor(fork) {
this.fork = fork;
}
static of(value) {
return new Task((reject, resolve) => resolve(value));
}
static rejected(error) {
return new Task((reject, resolve) => reject(error));
}
map(fn) {
return new Task((reject, resolve) =>
this.fork(reject, (value) => resolve(fn(value)))
);
}
flatMap(fn) {
return new Task((reject, resolve) =>
this.fork(reject, (value) => fn(value).fork(reject, resolve))
);
}
static fromPromise(promise) {
return new Task((reject, resolve) =>
promise.then(resolve).catch(reject)
);
}
}
// 使用
const fetchUser = (id) =>
Task.fromPromise(fetch(`/api/users/${id}`).then(r => r.json()));
const fetchPosts = (userId) =>
Task.fromPromise(fetch(`/api/users/${userId}/posts`).then(r => r.json()));
const program = fetchUser(1)
.flatMap(user => fetchPosts(user.id)
.map(posts => ({ user, posts })));
program.fork(
error => console.error(error),
result => console.log(result)
);
14.8 业务场景
14.8.1 微服务并发调用
// 并发调用多个微服务
const fetchAllServices = async (userId) => {
const [user, orders, recommendations] = await Promise.all([
fetch(`/api/users/${userId}`).then(r => r.json()),
fetch(`/api/orders?userId=${userId}`).then(r => r.json()),
fetch(`/api/recommendations/${userId}`).then(r => r.json()),
]);
return { user, orders, recommendations };
};
// 有限并发
const parallelLimit = (tasks, limit) => {
const results = [];
let index = 0;
const worker = async () => {
while (index < tasks.length) {
const i = index++;
results[i] = await tasks[i]();
}
};
return Promise.all(
Array.from({ length: limit }, () => worker())
).then(() => results);
};
14.8.2 消息队列处理
import asyncio
from asyncio import Queue
async def producer(queue: Queue, items):
for item in items:
await queue.put(item)
await asyncio.sleep(0.1) # 模拟生产延迟
await queue.put(None) # 哨兵值
async def consumer(queue: Queue, name: str):
while True:
item = await queue.get()
if item is None:
break
print(f"{name} processing: {item}")
await asyncio.sleep(0.2) # 模拟处理
print(f"{name} done")
async def main():
queue = asyncio.Queue(maxsize=10)
items = list(range(20))
producers = [asyncio.create_task(producer(queue, items))]
consumers = [
asyncio.create_task(consumer(queue, f"Worker-{i}"))
for i in range(3)
]
await asyncio.gather(*producers)
await asyncio.gather(*consumers)
asyncio.run(main())
14.9 注意事项
| 注意事项 | 说明 |
|---|---|
| 死锁预防 | Actor 模型和 CSP 避免传统死锁 |
| 背压处理 | 高速生产者需要背压机制 |
| 错误隔离 | 使用 Supervisor 监控并发任务 |
| 共享状态 | 尽量减少共享状态,使用消息传递 |
| 性能调优 | 并发数、队列大小需要根据场景调优 |
14.10 小结
| 要点 | 说明 |
|---|---|
| FP 优势 | 不可变性天然适合并发 |
| Actor 模型 | 封装状态 + 消息传递(Erlang/Elixir) |
| STM | 事务性内存,原子性保证 |
| CSP | 通道通信(Go, Clojure core.async) |
| 并行计算 | par_iter, parMap 等并行化工具 |
扩展阅读
- Programming Erlang — Joe Armstrong
- Clojure for the Brave and True - Ch 13 — 并发章节
- Software Transactional Memory — Microsoft Research
- Rayon - Rust 并行迭代器
下一章:15 函数式错误处理