谱聚类简介
谱聚类(Spectral Clustering)是一种基于图论的聚类算法,它利用数据的相似性矩阵(拉普拉斯矩阵)的特征向量进行降维,然后在低维空间中使用传统聚类方法(如K-means)进行聚类。与K-means等基于距离的算法相比,谱聚类能处理非凸形状的簇,对噪声和异常值相对鲁棒,在图像分割、社交网络分析等领域应用广泛。

核心思想
谱聚类将数据点视为图中的节点,根据点之间的相似性构建带权无向图,然后通过图割(Graph Cut)问题转化为特征值分解问题。其关键在于对拉普拉斯矩阵进行谱分解(特征分解),利用特征向量捕捉数据的全局结构。
优缺点
优点
- 能识别非凸形状的簇。
- 对数据分布假设较少,适用于复杂结构。
- 基于图论,理论基础坚实。
缺点
- 计算复杂度高,需要特征值分解,适合中小规模数据。
- 需要预先指定聚类数k和相似性参数(如$\sigma$)。
- 对相似性矩阵构建敏感。
应用场景
- 图像分割:将像素视为节点,相似性基于颜色/纹理,分割图像区域。
- 社交网络社区发现:用户为节点,关系为边,识别社区结构。
- 文本聚类:文档相似性构建图,聚类相关主题。
- 生物信息学:基因表达数据聚类。
谱聚类原理
通俗的讲解
把那些复杂的数学公式先放一边,我用一个比喻来帮你理解谱聚类到底在干什么。想象一下,你是一个老师,要带两个班的学生去操场做游戏,但学生们已经混在一起了。
目标:把学生重新分成两个班。
传统方法(比如K-means)的局限
你会喊:“一班的同学,站到左边那个圆圈里!二班的站到右边!” 这方法假设学生们会乖乖地、直线跑向自己的位置。但如果两个班的学生是手拉手围成了两个大圈(而不是两堆),这个方法就会乱套——它强行画出的分界线会切到圈里,把本应同班的人分开。
谱聚类的聪明做法
- 第一步:观察关系(建图),你不动声色地观察。你发现:
- 同班的学生之间,手拉着手,关系紧密(相似度高)。
- 不同班的学生之间,只是站得近,但没拉手,关系松散(相似度低)。
- 你把每个学生看作一个点,如果他们关系紧密,就在他们之间连一条粗橡皮筋;关系一般就连细橡皮筋。这样,整个操场就变成了一张由点和橡皮筋构成的“关系网”。
- 第二步:找到最薄弱环节(图划分),现在,你的任务是把这张大网剪成两张大网(两个班),但你有两个原则:
- 剪断的橡皮筋总“力道”要最小:你希望只剪断那些本来就细的、脆弱的连接(不同班之间的连接)。
- 两张新网大小要差不多:你不能剪出一个只有1个人的网和另一个99人的网。
- 直接去找从哪里下剪子非常困难。谱聚类想出了一个绝妙的办法:
- 第三步:神奇的“谱”转换(降维)
- 你给每个学生发了一个魔法音叉。当你敲击这张“关系网”时,整个网络会以一种特定的模式振动。
- 最重要的振动模式(对应第二小特征值的特征向量)会让学生们根据他们所属的“小团体”自然排成一列。
- 在这个队列里,同一个班的学生会自动紧挨在一起,而不同班的学生之间会有一个明显的空隙。
- 第四步:简单划分(最终聚类)
- 现在,你不需要再去研究复杂的“关系网”了。你只需要看着这个排好的队伍,在最大的那个空隙处切一刀(或者用最简单的K-means方法分两组),就能完美地把两个班分开!
总结一下谱聚类的核心思想:
- 着眼点不同:它不关心学生在操场上的绝对位置(坐标),而是关心他们之间的关系(相似度)。
- 关键转换:通过分析“关系网”的振动模式(拉普拉斯矩阵的特征向量),把复杂的关系数据,映射到一个简单的一维或多维空间中。
- 问题简化:在这个新空间里,原本缠绕不清的数据点会变得井然有序、易于分离。这时,再用简单的分类方法(如K-means)就能轻松搞定。
所以,谱聚类的精髓就是:利用数据的“关系谱”,把难题转化成一个简单题。
- 它擅长处理:像两个交织的圆圈、螺旋形、流形这样形状不规则的复杂数据。
- 它的代价是:计算“关系网”和振动模式比较耗时,且如何定义“关系”(相似度)需要一个关键参数。
算法步骤
构建相似性矩阵:计算数据点两两之间的相似度,常用高斯核函数:
$$W_{ij} = \exp (-\frac{\|x_i – x_j\|^2}{2\sigma^2})$$
其中$\sigma$为尺度参数。
计算拉普拉斯矩阵:常用归一化拉普拉斯矩阵$L_{\text{sym}} = D^{-1/2}WD^{-1/2}$,其中D为度矩阵(对角矩阵,$ D_{ii} = \sum_j W_{ij}$)。
特征值分解:对 $L_{\text{sym}}$进行特征分解,选取前k个最小特征值对应的特征向量(k为聚类数),构成矩阵$U \in \mathbb{R}^{n \times k}$。
归一化行向量:将U的每一行归一化为单位范数,得到矩阵T。
聚类:将T的每一行视为k维空间中的点,使用K-means算法进行聚类。
映射回原数据:将聚类结果映射回原始数据点。
谱聚类Python实现
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_moons, make_circles, make_blobs
from sklearn.cluster import KMeans
from sklearn.metrics.pairwise import pairwise_kernels
import warnings
warnings.filterwarnings('ignore')
class SpectralClustering:
"""
谱聚类实现类
"""
def __init__(self, n_clusters=2, sigma=1.0, kernel='rbf', n_neighbors=10,
laplacian_type='sym_normalized', random_state=42):
"""
初始化参数
参数:
n_clusters: 聚类数量
sigma: 高斯核的带宽参数(sigma)
kernel: 核函数类型,'rbf'或'linear'
n_neighbors: 用于k近邻图的邻居数(如果使用稀疏矩阵)
laplacian_type: 拉普拉斯矩阵类型
'unnormalized': 未归一化
'sym_normalized': 对称归一化(推荐)
'random_walk': 随机游走归一化
random_state: 随机种子
"""
self.n_clusters = n_clusters
self.sigma = sigma
self.kernel = kernel
self.n_neighbors = n_neighbors
self.laplacian_type = laplacian_type
self.random_state = random_state
self.labels_ = None
self.eigenvalues_ = None
self.eigenvectors_ = None
def _compute_similarity_matrix(self, X, use_sparse=False):
"""
计算相似度矩阵
参数:
X: 输入数据,形状为(n_samples, n_features)
use_sparse: 是否使用稀疏矩阵(基于k近邻)
返回:
W: 相似度矩阵
"""
n_samples = X.shape[0]
if use_sparse:
# 使用k近邻构建稀疏相似度矩阵(节省内存)
from scipy.spatial import KDTree
W = np.zeros((n_samples, n_samples))
tree = KDTree(X)
for i in range(n_samples):
# 找到每个点的k个最近邻
distances, indices = tree.query(X[i], k=self.n_neighbors + 1)
# 计算与最近邻的相似度
for j, idx in enumerate(indices[1:]): # 跳过自己
dist = distances[j+1]
W[i, idx] = np.exp(-dist**2 / (2 * self.sigma**2))
W[idx, i] = W[i, idx] # 确保对称
else:
# 全连接图:使用高斯核/RBF核
if self.kernel == 'rbf':
# 自己实现高斯核
sq_dists = np.sum(X**2, axis=1).reshape(-1, 1) + \
np.sum(X**2, axis=1) - 2 * np.dot(X, X.T)
W = np.exp(-sq_dists / (2 * self.sigma**2))
else:
# 使用sklearn的核函数
W = pairwise_kernels(X, metric=self.kernel, gamma=1/(2*self.sigma**2))
return W
def _construct_laplacian(self, W):
"""
构建拉普拉斯矩阵
参数:
W: 相似度矩阵
返回:
L: 拉普拉斯矩阵
"""
# 计算度矩阵
D = np.diag(np.sum(W, axis=1))
if self.laplacian_type == 'unnormalized':
# 未归一化拉普拉斯矩阵: L = D - W
L = D - W
elif self.laplacian_type == 'sym_normalized':
# 对称归一化拉普拉斯矩阵: L_sym = I - D^{-1/2}WD^{-1/2}
# 防止除以零
D_inv_sqrt = np.diag(1.0 / np.sqrt(np.diag(D) + 1e-10))
L = np.eye(W.shape[0]) - D_inv_sqrt @ W @ D_inv_sqrt
elif self.laplacian_type == 'random_walk':
# 随机游走归一化拉普拉斯矩阵: L_rw = I - D^{-1}W
D_inv = np.diag(1.0 / (np.diag(D) + 1e-10))
L = np.eye(W.shape[0]) - D_inv @ W
else:
raise ValueError(f"不支持的拉普拉斯类型: {self.laplacian_type}")
return L
def _compute_eigenvectors(self, L, n_clusters):
"""
计算拉普拉斯矩阵的特征值和特征向量
参数:
L: 拉普拉斯矩阵
n_clusters: 需要提取的特征向量数量
返回:
eigenvectors: 前k个最小特征值对应的特征向量
eigenvalues: 特征值
"""
# 计算特征值和特征向量
eigenvalues, eigenvectors = np.linalg.eigh(L)
# 对特征值排序(从小到大)
sorted_indices = np.argsort(eigenvalues)
eigenvalues = eigenvalues[sorted_indices]
eigenvectors = eigenvectors[:, sorted_indices]
# 选取前n_clusters个特征向量(跳过第一个,因为对应特征值为0)
# 注意:有时需要跳过前m个特征向量,取决于连通分量数
selected_eigenvectors = eigenvectors[:, 1:n_clusters+1]
# 行归一化(重要步骤!)
# 将每个特征向量行归一化为单位长度
row_norms = np.linalg.norm(selected_eigenvectors, axis=1, keepdims=True)
row_norms[row_norms == 0] = 1 # 避免除以零
normalized_eigenvectors = selected_eigenvectors / row_norms
return normalized_eigenvectors, eigenvalues
def fit(self, X):
"""
拟合谱聚类模型
参数:
X: 输入数据,形状为(n_samples, n_features)
返回:
self
"""
# 步骤1: 计算相似度矩阵
W = self._compute_similarity_matrix(X, use_sparse=False)
# 步骤2: 构建拉普拉斯矩阵
L = self._construct_laplacian(W)
# 步骤3: 计算特征向量
eigenvectors, eigenvalues = self._compute_eigenvectors(L, self.n_clusters)
self.eigenvalues_ = eigenvalues
self.eigenvectors_ = eigenvectors
# 步骤4: 对新特征向量进行K-means聚类
kmeans = KMeans(n_clusters=self.n_clusters, random_state=self.random_state)
self.labels_ = kmeans.fit_predict(eigenvectors)
return self
def fit_predict(self, X):
"""
拟合模型并返回聚类标签
参数:
X: 输入数据
返回:
labels: 聚类标签
"""
self.fit(X)
return self.labels_
# ==================== 示例和可视化 ====================
def generate_test_datasets():
"""
生成测试数据集
"""
np.random.seed(42)
# 数据集1: 两个交错的半月形(谱聚类的经典用例)
X_moons, y_moons = make_moons(n_samples=200, noise=0.1, random_state=42)
# 数据集2: 两个同心圆
X_circles, y_circles = make_circles(n_samples=200, factor=0.5, noise=0.05, random_state=42)
# 数据集3: 三个高斯团块(K-means表现好)
X_blobs, y_blobs = make_blobs(n_samples=200, centers=3, cluster_std=1.0, random_state=42)
datasets = [
(X_moons, y_moons, "Moons (半月形)"),
(X_circles, y_circles, "Circles (同心圆)"),
(X_blobs, y_blobs, "Blobs (团块)")
]
return datasets
def plot_results(X, y_true, y_pred, dataset_name, ax):
"""
绘制聚类结果
"""
# 绘制真实标签
ax[0].scatter(X[:, 0], X[:, 1], c=y_true, cmap='viridis',
edgecolors='k', alpha=0.7, s=50)
ax[0].set_title(f'{dataset_name} - 真实分布')
ax[0].set_xlabel('特征1')
ax[0].set_ylabel('特征2')
ax[0].grid(True, alpha=0.3)
# 绘制预测标签
ax[1].scatter(X[:, 0], X[:, 1], c=y_pred, cmap='viridis',
edgecolors='k', alpha=0.7, s=50)
ax[1].set_title(f'{dataset_name} - 谱聚类结果')
ax[1].set_xlabel('特征1')
ax[1].set_ylabel('特征2')
ax[1].grid(True, alpha=0.3)
def visualize_eigenvectors(eigenvectors, labels, dataset_name, ax):
"""
可视化特征向量空间
"""
if eigenvectors.shape[1] >= 2:
# 如果有至少2个特征向量,绘制前两个
ax.scatter(eigenvectors[:, 0], eigenvectors[:, 1], c=labels,
cmap='viridis', edgecolors='k', alpha=0.7, s=50)
ax.set_title(f'{dataset_name} - 特征向量空间')
ax.set_xlabel('第一特征向量')
ax.set_ylabel('第二特征向量')
ax.grid(True, alpha=0.3)
else:
# 如果只有一个特征向量,绘制一维分布
ax.scatter(eigenvectors[:, 0], np.zeros_like(eigenvectors[:, 0]),
c=labels, cmap='viridis', edgecolors='k', alpha=0.7, s=50)
ax.set_title(f'{dataset_name} - 特征向量空间(一维)')
ax.set_xlabel('第一特征向量')
ax.set_yticks([])
ax.grid(True, alpha=0.3)
def plot_eigenvalues(eigenvalues, dataset_name, ax):
"""
绘制特征值分布
"""
n_eigenvalues = min(10, len(eigenvalues))
ax.plot(range(1, n_eigenvalues+1), eigenvalues[1:n_eigenvalues+1],
'bo-', linewidth=2, markersize=8)
ax.set_title(f'{dataset_name} - 特征值分布')
ax.set_xlabel('特征值索引')
ax.set_ylabel('特征值')
ax.grid(True, alpha=0.3)
# 标记特征值间隔最大的地方
if len(eigenvalues) > 2:
gaps = eigenvalues[1:] - eigenvalues[:-1]
max_gap_idx = np.argmax(gaps[:n_eigenvalues-1]) + 1
ax.scatter(max_gap_idx+1, eigenvalues[max_gap_idx],
color='red', s=100, zorder=5, label=f'最大间隔: k={max_gap_idx}')
ax.legend()
def compare_with_kmeans(X, y_true, dataset_name):
"""
对比谱聚类和K-means的结果
"""
# 谱聚类
spectral = SpectralClustering(n_clusters=len(np.unique(y_true)),
sigma=0.2, random_state=42)
y_spectral = spectral.fit_predict(X)
# K-means
kmeans = KMeans(n_clusters=len(np.unique(y_true)), random_state=42)
y_kmeans = kmeans.fit_predict(X)
# 计算准确率(调整标签匹配)
from sklearn.metrics import adjusted_rand_score
spectral_score = adjusted_rand_score(y_true, y_spectral)
kmeans_score = adjusted_rand_score(y_true, y_kmeans)
# 可视化
fig, axes = plt.subplots(1, 3, figsize=(15, 4))
# 真实分布
axes[0].scatter(X[:, 0], X[:, 1], c=y_true, cmap='viridis',
edgecolors='k', alpha=0.7, s=50)
axes[0].set_title(f'{dataset_name} - 真实分布')
axes[0].set_xlabel('特征1')
axes[0].set_ylabel('特征2')
# 谱聚类结果
axes[1].scatter(X[:, 0], X[:, 1], c=y_spectral, cmap='viridis',
edgecolors='k', alpha=0.7, s=50)
axes[1].set_title(f'谱聚类 (ARI={spectral_score:.3f})')
axes[1].set_xlabel('特征1')
axes[1].set_ylabel('特征2')
# K-means结果
axes[2].scatter(X[:, 0], X[:, 1], c=y_kmeans, cmap='viridis',
edgecolors='k', alpha=0.7, s=50)
axes[2].set_title(f'K-means (ARI={kmeans_score:.3f})')
axes[2].set_xlabel('特征1')
axes[2].set_ylabel('特征2')
plt.tight_layout()
plt.show()
return spectral_score, kmeans_score
# ==================== 主程序:演示谱聚类 ====================
if __name__ == "__main__":
print("=" * 60)
print("谱聚类Python实现演示")
print("=" * 60)
# 生成测试数据
print("\n生成测试数据集...")
datasets = generate_test_datasets()
# 测试谱聚类在不同数据集上的表现
for i, (X, y_true, dataset_name) in enumerate(datasets):
print(f"\n{'='*40}")
print(f"数据集: {dataset_name}")
print(f"数据形状: {X.shape}")
print(f"真实类别数: {len(np.unique(y_true))}")
# 创建谱聚类实例
spectral = SpectralClustering(
n_clusters=len(np.unique(y_true)),
sigma=0.2, # 需要根据数据集调整sigma
laplacian_type='sym_normalized',
random_state=42
)
# 训练并预测
y_pred = spectral.fit_predict(X)
# 计算评估指标
from sklearn.metrics import adjusted_rand_score, normalized_mutual_info_score
ari = adjusted_rand_score(y_true, y_pred)
nmi = normalized_mutual_info_score(y_true, y_pred)
print(f"谱聚类结果:")
print(f" - 调整兰德指数(ARI): {ari:.4f}")
print(f" - 归一化互信息(NMI): {nmi:.4f}")
print(f" - 特征值数量: {len(spectral.eigenvalues_)}")
print(f" - 前5个特征值: {spectral.eigenvalues_[:5].round(4)}")
# 可视化结果
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
fig.suptitle(f'谱聚类演示 - {dataset_name}', fontsize=16)
# 第一行:聚类结果对比
plot_results(X, y_true, y_pred, dataset_name, axes[0])
# 第二行左:特征向量空间可视化
visualize_eigenvectors(spectral.eigenvectors_, y_pred, dataset_name, axes[1, 0])
# 第二行中:特征值分布
plot_eigenvalues(spectral.eigenvalues_, dataset_name, axes[1, 1])
# 第二行右:相似度矩阵可视化
similarity_matrix = spectral._compute_similarity_matrix(X)
im = axes[1, 2].imshow(similarity_matrix, cmap='hot', aspect='auto')
axes[1, 2].set_title(f'{dataset_name} - 相似度矩阵')
axes[1, 2].set_xlabel('样本索引')
axes[1, 2].set_ylabel('样本索引')
plt.colorbar(im, ax=axes[1, 2])
plt.tight_layout()
plt.show()
# 对比K-means
print(f"\n与K-means对比:")
spectral_score, kmeans_score = compare_with_kmeans(X, y_true, dataset_name)
# ==================== 参数调优示例 ====================
print(f"\n{'='*40}")
print("参数调优演示:sigma对谱聚类的影响")
print(f"{'='*40}")
# 使用半月形数据
X_moons, y_moons = make_moons(n_samples=200, noise=0.1, random_state=42)
# 测试不同的sigma值
sigma_values = [0.05, 0.1, 0.2, 0.5, 1.0]
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
axes = axes.flatten()
for idx, sigma in enumerate(sigma_values):
# 使用不同的sigma进行谱聚类
spectral = SpectralClustering(
n_clusters=2,
sigma=sigma,
laplacian_type='sym_normalized',
random_state=42
)
y_pred = spectral.fit_predict(X_moons)
ari = adjusted_rand_score(y_moons, y_pred)
# 可视化结果
ax = axes[idx]
ax.scatter(X_moons[:, 0], X_moons[:, 1], c=y_pred,
cmap='viridis', edgecolors='k', alpha=0.7, s=50)
ax.set_title(f'sigma={sigma}\nARI={ari:.3f}')
ax.set_xlabel('特征1')
ax.set_ylabel('特征2')
ax.grid(True, alpha=0.3)
# 添加一个真实分布的图
ax = axes[-1]
ax.scatter(X_moons[:, 0], X_moons[:, 1], c=y_moons,
cmap='viridis', edgecolors='k', alpha=0.7, s=50)
ax.set_title(f'真实分布\n类别数=2')
ax.set_xlabel('特征1')
ax.set_ylabel('特征2')
ax.grid(True, alpha=0.3)
plt.suptitle('不同sigma参数对谱聚类效果的影响', fontsize=16)
plt.tight_layout()
plt.show()
# ==================== 使用示例 ====================
print(f"\n{'='*40}")
print("谱聚类使用示例")
print(f"{'='*40}")
# 示例1:基本使用
print("\n1. 基本使用:")
X, _ = make_moons(n_samples=100, noise=0.1, random_state=42)
# 创建模型
spectral = SpectralClustering(n_clusters=2, sigma=0.2)
# 拟合和预测
labels = spectral.fit_predict(X)
print(f" 聚类完成! 聚类标签: {labels[:10]}...") # 显示前10个
# 示例2:获取特征向量
print("\n2. 获取特征向量:")
print(f" 特征向量形状: {spectral.eigenvectors_.shape}")
print(f" 特征值: {spectral.eigenvalues_[:5].round(4)}...") # 显示前5个
# 示例3:自定义参数
print("\n3. 自定义参数:")
spectral_custom = SpectralClustering(
n_clusters=3,
sigma=0.5,
kernel='rbf',
laplacian_type='sym_normalized',
random_state=42
)
print(" 自定义参数模型创建成功!")
print(f"\n{'='*40}")
print("谱聚类演示完成!")
print(f"{'='*40}")
谱聚类使用示例
import numpy as np
from sklearn.cluster import SpectralClustering
from sklearn.datasets import make_moons
import matplotlib.pyplot as plt
# 生成示例数据(两个半月形,非凸数据集)
X, y = make_moons(n_samples=200, noise=0.05, random_state=42)
# 创建谱聚类模型
# n_clusters: 聚类数量
# affinity: 相似性度量,'nearest_neighbors'或'rbf'(高斯核)
# n_neighbors: 当affinity='nearest_neighbors'时的邻居数
# random_state: 随机种子,确保可重复性
spectral = SpectralClustering(
n_clusters=2,
affinity='nearest_neighbors',
n_neighbors=10,
random_state=42
)
# 执行聚类
labels = spectral.fit_predict(X)
# 可视化结果
plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plt.scatter(X[:, 0], X[:, 1], c=y, cmap='viridis', s=50)
plt.title('原始数据(真实标签)')
plt.subplot(1, 2, 2)
plt.scatter(X[:, 0], X[:, 1], c=labels, cmap='viridis', s=50)
plt.title('谱聚类结果')
plt.show()
关键参数说明
- n_clusters:聚类数量,需要预先指定。
- affinity:构建相似性矩阵的方法:
- `’rbf’`:使用高斯核函数,需要设置`gamma`参数。
- `’nearest_neighbors’`:基于k近邻图构建,需要设置`n_neighbors`。
- `’precomputed’`:使用预先计算的相似性矩阵。
- n_neighbors:当`affinity=’nearest_neighbors’`时,构建k近邻图的邻居数。
- gamma:当`affinity=’rbf’`时,高斯核的带宽参数(公式中的1/(2σ²))。
- random_state:随机种子,确保K-means初始化的可重复性。
注意事项
- 谱聚类对相似性矩阵和参数选择敏感,建议通过网格搜索优化参数。
- 对于大规模数据,考虑使用`eigen_solver=’amg’`(代数多重网格)加速特征值分解。
- 数据预处理(如标准化)可能改善聚类效果。



