机器学习, 法→原理

子空间聚类算法CLIQUE

钱魏Way · · 9 次浏览

CLIQUE(CLustering In QUEst)是一种经典的子空间聚类算法,由IBM Almaden研究中心在1998年提出。它专门用于从高维数据中发现密度相似的簇,且这些簇可能仅存在于某些子空间(特征的子集)中,而非全维空间。

CLIQUE简介

产生背景

高维数据的爆发与“维度灾难

  • 数据环境变化:随着信息技术、商业数据库(如超市交易记录)、科学实验(如基因微阵列)的普及,可收集的数据维度(特征/属性)急剧增长,从几十维跃升至成百上千维。
  • 核心矛盾:传统聚类算法(如K-Means、DBSCAN)严重依赖全维度空间的距离度量(如欧氏距离)。在高维空间中,所有数据点之间的距离会变得极其相似(数据极度稀疏),这种现象被称为“维度灾难”。这使得基于距离的相似性计算失效,传统算法在全维度空间中难以找到有意义的簇。

簇存在于“子空间”而非全空间

  • 现实观察:在实际高维数据中,有意义的簇通常只由一部分相关属性决定,而其他属性可能是无关的噪声。例如:
    • 客户细分:一个簇可能由“年龄”和“收入”定义,而“邮编”在该簇内是随机分布。
    • 基因表达:某些基因只在特定实验条件下(部分维度)协同表达。
  • 关键洞察:在全维度空间中看似无结构的数据,在某些低维子空间投影中可能呈现出清晰的密集簇。因此,聚类任务不仅是要找簇,更要自动发现哪些子空间存在簇。

前人方法的局限

在CLIQUE之前,主流应对高维聚类问题的方法存在明显不足:

  • 特征选择/降维(如PCA):为整个数据集选择一个全局的低维投影。弊端:可能会扭曲或掩盖仅存在于特定子空间中的簇,因为不同簇可能依赖于不同的特征子集。
  • 传统全空间聚类算法:如上所述,在高维下效果很差。
  • 需要手动指定相关维度:这要求领域专家预先知道哪些特征重要,不适用于探索性数据挖掘。

技术思潮的融合与创新

CLIQUE的设计巧妙地借鉴了当时两个火热领域的核心思想:

  • 来自关联规则挖掘的“Apriori原理”:用于高效搜索频繁项集。CLIQUE将其创造性应用于子空间搜索,提出“单调性原理”:如果一个k维单元是密集的,那么它在所有(k-1)维投影中的父单元也必须是密集的。这允许算法自底向上、逐层生成并剪枝候选子空间,极大地避免了组合爆炸。
  • 来自密度聚类(如DBSCAN)的“基于密度”思想:通过定义“密集单元”来发现任意形状的簇,并对噪声鲁棒。但CLIQUE将其与网格化结合,将连续空间离散化为单元,通过统计单元内点数来定义密度,大幅提升了计算效率。

核心思想

CLIQUE 的核心思想可以概括为:通过网格化和密度连接来发现高维数据中的簇,并利用子空间搜索的单调性原理,自动且高效地定位那些簇所存在的低维子空间。

这个思想基于一个深刻且现实的观察:在高维数据中,有意义的簇通常只在一部分属性(子空间)上密集分布,而在其他属性上是随机或稀疏的。因此,其核心围绕着以下三个相互关联的支柱展开:

第一支柱:网格化与稠密单元

核心思想:将连续的、高维的数据空间离散化为网格单元,用单元的“数据点密度”来定义簇。

  • 网格化:对每个维度进行等宽划分,将整个数据空间划分为一个个超矩形单元。这就像为数据世界绘制了一张带坐标网格的地图。
  • 稠密单元:如果一个单元内包含的数据点数量超过了预设的密度阈值,它就被标记为“稠密单元”。稠密单元被认为是潜在簇的“砖块”。

为什么这样做?

  • 降维计算:将复杂的连续空间密度计算,简化为对离散单元的计数,计算效率极高。
  • 定义清晰:为“簇”提供了一个明确的、基于统计的定义。

第二支柱:向下闭包性与自底向上搜索

核心思想:利用“单调性”原理,像搭积木一样从低维到高维,系统地、高效地搜索所有可能存在簇的子空间。

这是CLIQUE最具创新性的部分,灵感来源于关联规则挖掘中的Apriori算法。

  • 向下闭包性:如果一个 k维的单元是稠密的,那么它在所有 (k-1)维空间上的投影(父单元)也必须是稠密的。
    • 直观理解:如果一个区域在“收入-年龄”二维平面上很密集,那么在单独看“收入”或“年龄”这两个一维分布时,该区域对应的区间也必然是密集的。反之则不成立(低维密集,高维不一定密集)。
  • 自底向上搜索流程:
    • 开始:先找出所有1维子空间(即单个属性)中的稠密单元。
    • 迭代:基于已经找到的(k-1)维稠密单元,通过组合(类似连接操作)生成候选的k维单元。
    • 剪枝:利用向下闭包性进行强力剪枝——如果一个候选k维单元的任何一个(k-1)维父单元不是稠密的,那么这个候选单元不可能是稠密的,可以直接丢弃,无需扫描数据验证。
    • 终止:直到找不到更高维的候选稠密单元为止。

为什么这样做?

  • 自动化:算法自动发现哪些属性组合(子空间)存在密集模式,无需人工指定。
  • 高效性:剪枝策略避免了在所有可能的子空间组合(是指数级数量)上进行穷举搜索,是算法可行的关键。

第三支柱:连通性与最小化描述

核心思想:将邻近的“砖块”(稠密单元)连接起来形成完整的“建筑”(簇),并用最简洁的规则来描述它。

  • 簇的生成:在找到的每个稠密集子空间中,将共享边界的稠密单元连接起来,形成一个连通的区域。这个区域就是一个簇。它可以是任意形状,不限于球形。
  • 最小化描述:为每个生成的簇,找到一个最精简的规则集来覆盖其所有单元。例如:簇 = {年龄 ∈ [20,30] ∧ 收入 ∈ [50k,80k]}。这使聚类结果具有极强的可解释性,可以直接转化为业务知识。

为什么这样做?

  • 捕获任意形状:基于连通性,能发现传统基于距离的方法难以发现的复杂形状簇。
  • 结果可解释:最终的输出不仅是数据点的分组,更是清晰的、人类可读的“如果-那么”规则,这是其巨大优势。

核心思想总结

CLIQUE的智慧在于,它将一个极其复杂的高维聚类问题,分解为三个可计算、可管理的阶段:

  • 量化(网格与密度):将“密集区域”这个模糊概念,量化为“超过阈值的网格单元”。
  • 搜索(单调性剪枝):利用“局部密集则整体必密集”的逻辑,像侦探一样高效排除大量不可能存在簇的区域,锁定目标子空间。
  • 构造与解释(连接与描述):将找到的“证据碎片”(稠密单元)拼合成完整的“画像”(簇),并用最简语言(规则)描述出来。

一个生动的比喻

想象在一片广阔的多维星海中寻找文明(簇)。CLIQUE的做法是:

  • 先绘制星图(网格化)。
  • 然后从单个星座(1维)开始观察,记录恒星密集的星座。
  • 接着根据一个关键法则:“如果一个双星系统(2维)密集,那么组成它的两颗单星(1维)所在区域也必然密集”。利用这个法则,只在那些单星密集的区域去寻找可能密集的双星系统,大大缩小了搜索范围。
  • 最后,将邻近的密集双星、三星系统连接起来,标记为一个文明星团,并记录下它的精确坐标范围(规则描述)。

与同类方法对比

CLIQUE 作为子空间聚类领域的开创性算法,与同类方法的对比可以从几个维度展开:核心策略、目标、优势与局限。下表清晰地勾勒出它与几类代表性方法的区别:

方法类别 典型代表 核心策略 与 CLIQUE 的关键对比
传统全空间聚类 K-Means, DBSCAN 在所有维度上计算距离/密度进行聚类。 根本差异:CLIQUE 认为簇存在于子空间,能避免无关维度干扰;传统方法受“维度灾难”影响,在高维下失效。
基于距离/质心的子空间聚类 PROCLUS, ORCLUS 迭代选择子空间并为簇分配维度,基于距离度量。 搜索方式:CLIQUE 是自底向上、穷举剪枝式搜索;PROCLUS 是自顶向下、迭代采样式搜索。
簇形状:CLIQUE 基于密度,可发现任意形状簇;PROCLUS 基于距离,偏向球形或凸形簇。
输出:CLIQUE 提供可解释的规则;PROCLUS 给出簇的维度列表。
基于密度的子空间聚类 SUBCLU 将 DBSCAN 的核心思想(密度连接)直接扩展至子空间。 密度定义:CLIQUE 基于网格单元计数,边界生硬;SUBCLU 基于邻域和核心点,边界更自然。
精度与效率:SUBCLU 聚类质量更高,能识别更精细的密度变化,但计算成本远高于CLIQUE 的网格法。
结果:两者都能发现任意形状簇,但 SUBCLU 结果通常更准确。
降维后聚类 PCA + K-Means 先用线性变换(如PCA)将数据投影到全局低维空间,再聚类。 子空间性质:CLIQUE 发现的是轴平行(axis-parallel) 的子空间(原始特征子集);PCA 发现的是任意方向的线性子空间(特征组合)。
簇适用性:CLIQUE 适合不同簇依赖不同特征的数据;PCA 为全体数据寻找一个最优的全局低维视图,可能掩盖局部簇。

与 PROCLUS 对比:搜索范式的根本不同

CLIQUE 和 PROCLUS 代表了子空间聚类的两大经典范式。

  • CLIQUE (自底向上,基于关联规则):
    • 过程:像搭积木,从1维开始,利用“密集单元”的向下闭包性(单调性)逐层(2维、3维…)构建和验证候选子空间,最后将稠密单元连成簇。
    • 优点:相对系统、全面,能保证发现所有满足密度阈值的轴平行子空间簇,结果可解释性强。
    • 缺点:参数(网格粒度、密度阈值)敏感,网格边界可能导致簇的割裂。
  • PROCLUS (自顶向下,基于K-Medoids):
    • 过程:先初始化寻找可能的簇中心点(Medoids),然后为每个簇分配一组相关的维度(子空间),再迭代调整中心和维度,直到收敛。
    • 优点:效率通常更高,对参数依赖相对较小,簇的描述简洁(每个簇一个维度集)。
    • 缺点:需要预设簇数目K,对初始中心点敏感,偏向发现球形簇,可能错过低维的、基于密度的复杂形状簇。

简单来说:CLIQUE 是“发现密集区域并描述之”,PROCLUS 是“给簇分配一个特征子集”。

与 SUBCLU 对比:密度定义的精髓差异

两者都基于“密度”,但对密度的实现天差地别。

  • CLIQUE (网格密度):
    • 将空间划分为固定的网格,统计每个网格里的点数。密度是全局的、平均的。
    • 缺点:网格大小固定,可能将一个自然连续的密集区域切割到多个单元中(边界问题),或者因阈值设置而丢失或合并簇。
  • SUBCLU (基于点的密度):
    • 直接沿用 DBSCAN 的定义:一个点是核心点,如果其ε-邻域内点的数量超过阈值。密度是局部的、基于点的。
    • 优点:能更自然、更精确地刻画任意形状的簇和密度变化,对参数设置更鲁棒。
    • 缺点:在高维子空间中进行基于点的邻域查询,计算复杂度远高于CLIQUE的网格计数,可扩展性是其挑战。

本质区别:CLIQUE 是“数格子”,SUBCLU 是“量半径”。前者快但粗糙,后者准但慢。

在算法家族内的演进

CLIQUE 启发了一系列改进算法,它们构成了一个家族:

  • MAFIA: 主要改进网格划分,采用自适应(数据驱动的)网格,而非等宽网格,能更贴合数据分布,提升聚类质量。
  • ENCLUS: 改进了子空间搜索标准。CLIQUE 用“密度”,ENCLUS 用“熵”来衡量子空间的兴趣度(聚类潜力),旨在找到更有趣、更不均匀分布的子空间。
  • FIRES: 一种框架式方法,先在全维空间找到低维的“种子簇”,再将其合并或扩展到相关子空间,提供了另一种搜索思路。

优缺点与应用场景

优点

  • 自动化子空间发现
    • 核心优势:无需预先指定哪些维度与聚类相关。算法能自动识别出存在密集簇的特征子集,这是其最根本的贡献。
  • 对输入顺序不敏感
    • 由于基于全局网格划分和计数,结果的确定性和可重现性高,不受数据点输入顺序影响。
  • 可扩展性高,效率突出
    • 网格化计数:将复杂的邻域查询简化为对网格单元的整数累加,计算成本远低于基于距离的算法。
    • 单调性剪枝:利用向下闭包性大幅减少需要检查的高维候选子空间数量,避免了组合爆炸。
    • 非常适合处理大规模、高维度的数值型数据集。
  • 对噪声数据鲁棒
    • 低密度单元被自然过滤为噪声,不参与簇的形成,增强了模型的稳定性。
  • 可发现任意形状的簇
    • 基于密度单元的连接,而非基于距离的质心,因此能识别非球形、任意形状的簇结构。
  • 结果可解释性强
    • 输出不仅是簇的成员列表,更是清晰的“IF-THEN”形式规则(例如:IF 年龄 ∈ [20,30] AND 收入 ∈ [50k,80k] THEN 属于簇A),可直接转化为业务洞察。

缺点

  • 参数敏感且难以设置
    • 网格划分参数 (ξ):每个维度划分的区间数。太小会导致粒度太粗,可能合并多个簇;太大则粒度太细,可能将一个簇分解,并产生大量空单元。
    • 密度阈值 (τ):区分稠密与稀疏单元的阈值。过高会遗漏真实但较稀疏的簇;过低会产生大量假阳性簇(将噪声区域误判为簇)。
    • 参数设置高度依赖先验知识或反复试验。
  • 网格边界问题
    • 边界割裂:一个自然的密集区域可能被网格边界切割成多个相邻的稠密单元,虽然算法会连接它们,但切割仍可能影响簇的“自然”边界识别精度。
    • 对数据分布和坐标轴对齐敏感:网格是轴平行的(Axis-aligned)。如果簇的边界是斜的或与坐标轴不平行,网格化会引入较大误差。
  • 忽略属性间的相关性
    • 网格划分完全独立于每个维度进行,忽略了特征之间的相关性(如:年龄和收入可能存在的非线性关系)。这限制了其发现非轴平行子空间簇的能力。
  • 可能产生冗余簇描述
    • 一个簇可能被多个重叠或相邻的规则集描述,需要后处理来生成最小覆盖集。
  • 所有维度被平等对待
    • 在子空间搜索的初始阶段(1维),算法平等地检查所有单个维度。如果大多数维度都是噪声,计算资源可能被浪费。

典型应用场景

CLIQUE特别适合于以下类型的任务和数据:

  • 高维数据的探索性数据分析
    • 场景:面对一个维度很高(数十维至数百维)、内在结构未知的数据集,需要快速了解“哪些特征组合可能隐藏着有意义的群体”。
    • 示例:客户画像分析、基因表达数据的初步模式发现。
  • 需要可解释性规则的商业智能
    • 场景:聚类目标不仅是分组,更是要生成可直接用于决策的、人类可读的业务规则。
    • 示例:
      • 市场细分:发现规则如“年轻(20-30岁) & 高收入(>50K) & 高频网购”的客户群,以便进行精准营销。
      • 风险控制:识别具有“交易频率异常高 & 单笔金额中等 & 登录地点分散”特征的潜在欺诈账户规则。
    • 处理相对稀疏的高维数据
      • 场景:数据在全维空间中非常稀疏(如文本的单词向量、用户-商品矩阵),但在某些低维子空间中可能存在密集块。
      • 示例:文档聚类中,某些主题可能仅由一部分关键词(子空间)决定;推荐系统中,特定用户群可能只对某类商品(子空间)有共同偏好。
    • 对聚类形状无球形假设,且计算资源有限
      • 场景:需要发现任意形状的簇,同时数据量很大,计算效率是重要考量。

不适用或慎用场景

  • 维度极低(如2D、3D)的数据
    • 原因:有更高效、更精确的专用算法(如DBSCAN、OPTICS)。CLIQUE的网格化在此显得笨重且不必要。
  • 要求簇边界极其精确
    • 原因:网格边界效应会导致结果不够精确。
  • 数据中存在大量复杂的相关性或非线性结构
    • 原因:轴平行的网格无法有效捕获这些模式。应考虑基于流形学习或相关子空间的聚类方法。
  • 无法接受参数调优
    • 原因:CLIQUE的效果严重依赖参数调整,在缺乏调优经验或自动化工具时,效果可能不稳定。

CLIQUE算法实现

CLIQUE算法详细步骤

数据预处理

  • 将所有特征归一化到[0,1]范围
  • 确定每个维度的划分区间数ξ

网格划分

  • 每个维度i等分为ξ个等宽区间
  • 整个d维空间被划分为ξ^d个单元
  • 单元表示:每个单元用其在每个维度的区间索引表示

识别稠密单元

# 伪代码逻辑
for 每个维度组合:
    为每个单元计数数据点
    如果计数 ≥ τ(总点数 * 阈值比例):
        标记为稠密单元

自底向上搜索子空间

# 基于向下闭包性
D1 = 所有1维稠密单元的集合
k = 1
while Dk不为空:
    # 生成候选(k+1)维单元
    Ck+1 = 从Dk中生成候选(k+1)维单元
    
    # 剪枝:移除任何(k+1)维候选,如果其任意k维投影不在Dk中
    剪枝Ck+1
    
    # 扫描数据,计算候选单元的密度
    for 每个候选单元c in Ck+1:
        计算c中的数据点计数
        if 计数 ≥ τ:
            将c加入Dk+1
    k = k + 1

生成簇

  • 在每个子空间中,连接相邻的稠密单元形成簇
  • 相邻性:两个单元在每一维度上的索引最多相差1

生成最小化描述

  • 为每个簇找到最小覆盖规则
  • 规则形式:[维度i ∈ 范围] AND [维度j ∈ 范围] …

Python实现

import numpy as np
from collections import defaultdict, deque
from typing import List, Dict, Tuple, Set, Any
import itertools

class CLIQUE:
    def __init__(self, xi: int = 10, tau: float = 0.2):
        """
        初始化CLIQUE算法
        
        参数:
        xi: 每个维度的划分区间数
        tau: 密度阈值比例 (单元内点数占总点数的比例)
        """
        self.xi = xi
        self.tau = tau
        self.dense_units = defaultdict(list)  # 存储各维度稠密单元
        self.clusters = []  # 存储最终簇
        self.rules = []  # 存储簇的描述规则
        
    def fit(self, X: np.ndarray) -> 'CLIQUE':
        """
        执行CLIQUE聚类
        
        参数:
        X: 输入数据 (n_samples, n_features)
        
        返回:
        self: 训练好的模型
        """
        self.n_samples, self.n_features = X.shape
        self.X = X.copy()
        
        # 1. 数据归一化
        self.X_normalized = self._normalize(X)
        
        # 2. 网格划分
        self.grid_cells = self._create_grid()
        
        # 3. 自底向上搜索稠密单元
        self._find_dense_units()
        
        # 4. 从稠密单元生成簇
        self._form_clusters()
        
        # 5. 生成簇标签
        self.labels_ = self._generate_labels()
        
        return self
    
    def _normalize(self, X: np.ndarray) -> np.ndarray:
        """将数据归一化到[0,1]范围"""
        X_min = X.min(axis=0)
        X_max = X.max(axis=0)
        # 避免除零
        X_range = X_max - X_min
        X_range[X_range == 0] = 1.0
        
        return (X - X_min) / X_range
    
    def _create_grid(self) -> Dict[Tuple[int, ...], List[int]]:
        """创建网格并分配数据点到单元格"""
        # 计算区间宽度
        interval_width = 1.0 / self.xi
        
        # 初始化网格字典:键是单元格坐标,值是点索引列表
        grid = defaultdict(list)
        
        # 分配每个数据点到单元格
        for i, point in enumerate(self.X_normalized):
            # 计算点在每个维度上的区间索引
            cell_coords = tuple(int(coord // interval_width) for coord in point)
            # 处理边界情况:当coord == 1.0时
            cell_coords = tuple(min(c, self.xi - 1) for c in cell_coords)
            grid[cell_coords].append(i)
        
        return grid
    
    def _find_dense_units(self):
        """自底向上搜索稠密单元"""
        # 阈值:最小点数
        min_points = self.tau * self.n_samples
        
        # 步骤1: 找到所有1维稠密单元
        D1 = []  # 存储1维稠密单元
        for dim in range(self.n_features):
            # 计算该维度每个区间的点数
            interval_counts = np.zeros(self.xi, dtype=int)
            
            for cell_coords, point_indices in self.grid_cells.items():
                interval_idx = cell_coords[dim]
                interval_counts[interval_idx] += len(point_indices)
            
            # 标记稠密区间
            for interval_idx, count in enumerate(interval_counts):
                if count >= min_points:
                    dense_unit = (dim, interval_idx)
                    D1.append(dense_unit)
        
        self.dense_units[1] = D1
        
        # 步骤2: 自底向上搜索更高维稠密单元
        k = 1
        while k < self.n_features and len(self.dense_units[k]) > 0:
            # 生成候选(k+1)维单元
            candidates = self._generate_candidates(k)
            
            if not candidates:
                break
                
            # 扫描数据,验证候选单元密度
            Dk1 = []  # 存储(k+1)维稠密单元
            
            for candidate in candidates:
                # 计算候选单元中的数据点数
                count = self._count_points_in_unit(candidate)
                
                if count >= min_points:
                    Dk1.append(candidate)
            
            if Dk1:
                self.dense_units[k + 1] = Dk1
                k += 1
            else:
                break
    
    def _generate_candidates(self, k: int) -> List[Tuple]:
        """生成候选(k+1)维单元"""
        candidates = []
        
        if k not in self.dense_units or not self.dense_units[k]:
            return candidates
        
        Dk = self.dense_units[k]
        
        # 如果k=1,生成2维候选
        if k == 1:
            # 组合不同的维度
            for i in range(len(Dk)):
                for j in range(i + 1, len(Dk)):
                    dim1, interval1 = Dk[i]
                    dim2, interval2 = Dk[j]
                    
                    if dim1 != dim2:
                        # 检查所有(k-1)维投影是否稠密
                        # 对于2维,(1,2)的1维投影是(1)和(2),都在D1中
                        candidate = ((dim1, interval1), (dim2, interval2))
                        # 按维度排序
                        candidate = tuple(sorted(candidate))
                        candidates.append(candidate)
        else:
            # 对于k>1,通过合并相似的(k-1)维单元生成候选
            # 这里简化处理:只检查维度组合
            seen = set()
            for i in range(len(Dk)):
                for j in range(i + 1, len(Dk)):
                    unit1 = Dk[i]  # k维单元
                    unit2 = Dk[j]  # k维单元
                    
                    # 检查是否共享(k-1)个维度
                    dims1 = {dim for dim, _ in unit1}
                    dims2 = {dim for dim, _ in unit2}
                    
                    if len(dims1.intersection(dims2)) == k - 1:
                        # 可以合并
                        merged = sorted(set(unit1) | set(unit2))
                        # 检查所有k维投影是否稠密
                        if self._check_downward_closure(merged, Dk):
                            candidate = tuple(merged)
                            if candidate not in seen:
                                candidates.append(candidate)
                                seen.add(candidate)
        
        return candidates
    
    def _check_downward_closure(self, candidate: Tuple, Dk: List) -> bool:
        """检查向下闭包性:候选单元的所有k维投影是否都在Dk中"""
        k_plus_1 = len(candidate)
        
        # 生成所有k维子投影
        for i in range(k_plus_1):
            # 移除第i个元素得到k维投影
            projection = candidate[:i] + candidate[i+1:]
            
            # 检查投影是否在Dk中
            if projection not in Dk:
                return False
        
        return True
    
    def _count_points_in_unit(self, unit: Tuple) -> int:
        """计算单元中的数据点数"""
        count = 0
        
        # 单元表示:((dim1, interval1), (dim2, interval2), ...)
        # 转换为网格坐标条件
        conditions = {}
        for dim, interval in unit:
            conditions[dim] = interval
        
        # 扫描所有点
        for i, point in enumerate(self.X_normalized):
            in_unit = True
            interval_width = 1.0 / self.xi
            
            for dim, interval in unit:
                # 计算点在该维度的区间索引
                point_interval = int(point[dim] // interval_width)
                if point_interval == self.xi:  # 处理边界
                    point_interval = self.xi - 1
                
                if point_interval != interval:
                    in_unit = False
                    break
            
            if in_unit:
                count += 1
        
        return count
    
    def _form_clusters(self):
        """从稠密单元形成簇"""
        # 在每个维度上,连接相邻的稠密单元
        for dim_size, units in self.dense_units.items():
            if not units:
                continue
                
            # 将稠密单元转换为图结构
            graph = self._build_adjacency_graph(units, dim_size)
            
            # 在图中寻找连通分量
            visited = set()
            for unit in units:
                if tuple(unit) not in visited:
                    cluster = []
                    self._dfs(unit, graph, visited, cluster)
                    if cluster:
                        self.clusters.append((dim_size, cluster))
    
    def _build_adjacency_graph(self, units: List, dim_size: int) -> Dict:
        """构建稠密单元的邻接图"""
        graph = defaultdict(list)
        
        # 转换为可哈希的元组
        unit_tuples = [tuple(unit) for unit in units]
        
        for i, unit1 in enumerate(unit_tuples):
            for j, unit2 in enumerate(unit_tuples[i+1:], i+1):
                if self._are_adjacent(unit1, unit2, dim_size):
                    graph[unit1].append(unit2)
                    graph[unit2].append(unit1)
        
        return graph
    
    def _are_adjacent(self, unit1: Tuple, unit2: Tuple, dim_size: int) -> bool:
        """检查两个单元是否相邻"""
        # 两个单元在每个维度上的区间索引最多相差1
        diff_count = 0
        
        # 转换为字典形式方便比较
        unit1_dict = dict(unit1)
        unit2_dict = dict(unit2)
        
        # 检查所有维度
        all_dims = set(unit1_dict.keys()) | set(unit2_dict.keys())
        
        for dim in all_dims:
            idx1 = unit1_dict.get(dim, -1)
            idx2 = unit2_dict.get(dim, -1)
            
            if idx1 != -1 and idx2 != -1:
                # 两个单元都包含这个维度
                if abs(idx1 - idx2) > 1:
                    return False
                elif abs(idx1 - idx2) == 1:
                    diff_count += 1
            else:
                # 只有一个单元包含这个维度
                # 这种情况不应该发生,除非单元维度不同
                return False
        
        # 至少在一个维度上相邻
        return diff_count >= 1
    
    def _dfs(self, unit: Tuple, graph: Dict, visited: Set, cluster: List):
        """深度优先搜索,寻找连通分量"""
        stack = [unit]
        
        while stack:
            current = stack.pop()
            if current in visited:
                continue
                
            visited.add(current)
            cluster.append(current)
            
            for neighbor in graph.get(current, []):
                if neighbor not in visited:
                    stack.append(neighbor)
    
    def _generate_labels(self) -> np.ndarray:
        """为每个数据点生成簇标签"""
        labels = -np.ones(self.n_samples, dtype=int)  # -1表示噪声
        
        # 为每个簇分配点
        cluster_id = 0
        for dim_size, cluster_units in self.clusters:
            for unit in cluster_units:
                # 找到在这个单元中的所有点
                unit_points = self._get_points_in_unit(unit)
                
                for point_idx in unit_points:
                    if labels[point_idx] == -1:  # 还未分配
                        labels[point_idx] = cluster_id
                    # 注意:一个点可能属于多个簇,这里简单分配第一个遇到的簇
                
            cluster_id += 1
        
        return labels
    
    def _get_points_in_unit(self, unit: Tuple) -> List[int]:
        """获取单元中的所有点索引"""
        points = []
        interval_width = 1.0 / self.xi
        
        for i, point in enumerate(self.X_normalized):
            in_unit = True
            
            for dim, interval in unit:
                point_interval = int(point[dim] // interval_width)
                if point_interval == self.xi:  # 处理边界
                    point_interval = self.xi - 1
                
                if point_interval != interval:
                    in_unit = False
                    break
            
            if in_unit:
                points.append(i)
        
        return points
    
    def get_cluster_rules(self) -> List[Dict]:
        """获取每个簇的描述规则"""
        rules = []
        
        for cluster_id, (dim_size, cluster_units) in enumerate(zip(range(len(self.clusters)), self.clusters)):
            if dim_size >= len(self.clusters):
                continue
                
            _, units = self.clusters[dim_size]
            
            # 为每个簇找到最小覆盖区间
            cluster_rule = {}
            
            # 收集这个簇在所有维度上的区间
            dim_intervals = defaultdict(list)
            for unit in units:
                for dim, interval in unit:
                    dim_intervals[dim].append(interval)
            
            # 为每个维度找到最小和最大区间
            for dim, intervals in dim_intervals.items():
                if intervals:
                    min_interval = min(intervals)
                    max_interval = max(intervals)
                    
                    # 转换为原始数据范围
                    min_val = min_interval * (1.0 / self.xi)
                    max_val = (max_interval + 1) * (1.0 / self.xi)
                    
                    # 反归一化
                    if hasattr(self, 'X_min') and hasattr(self, 'X_range'):
                        min_val_orig = min_val * self.X_range[dim] + self.X_min[dim]
                        max_val_orig = max_val * self.X_range[dim] + self.X_min[dim]
                        cluster_rule[dim] = (min_val_orig, max_val_orig)
                    else:
                        cluster_rule[dim] = (min_val, max_val)
            
            rules.append({
                'cluster_id': cluster_id,
                'dimensions': list(cluster_rule.keys()),
                'rules': cluster_rule,
                'num_points': np.sum(self.labels_ == cluster_id)
            })
        
        return rules

# 三、简化版CLIQUE实现(更高效)

class SimpleCLIQUE:
    """简化版CLIQUE实现,专注于核心逻辑"""
    
    def __init__(self, xi=10, tau=0.1):
        self.xi = xi
        self.tau = tau
        self.labels_ = None
        
    def fit(self, X):
        """简化的CLIQUE实现"""
        n_samples, n_features = X.shape
        
        # 1. 归一化
        X_min = X.min(axis=0)
        X_max = X.max(axis=0)
        X_range = X_max - X_min
        X_range[X_range == 0] = 1.0
        X_norm = (X - X_min) / X_range
        
        # 2. 网格划分
        interval_width = 1.0 / self.xi
        grid = {}
        
        for i in range(n_samples):
            # 计算网格坐标
            coords = tuple(min(int(val // interval_width), self.xi-1) 
                          for val in X_norm[i])
            if coords not in grid:
                grid[coords] = []
            grid[coords].append(i)
        
        # 3. 识别稠密单元
        min_points = self.tau * n_samples
        dense_cells = []
        
        for coords, points in grid.items():
            if len(points) >= min_points:
                dense_cells.append(coords)
        
        # 4. 连接相邻稠密单元形成簇
        clusters = []
        visited = set()
        
        for cell in dense_cells:
            if cell in visited:
                continue
                
            # 新簇
            cluster = []
            queue = deque([cell])
            
            while queue:
                current = queue.popleft()
                if current in visited:
                    continue
                    
                visited.add(current)
                cluster.append(current)
                
                # 找到相邻单元
                for neighbor in self._get_neighbors(current):
                    if neighbor in grid and neighbor in dense_cells and neighbor not in visited:
                        queue.append(neighbor)
            
            if cluster:
                clusters.append(cluster)
        
        # 5. 分配标签
        labels = -np.ones(n_samples, dtype=int)
        
        for i, cluster in enumerate(clusters):
            for cell in cluster:
                for point_idx in grid[cell]:
                    if labels[point_idx] == -1:
                        labels[point_idx] = i
        
        self.labels_ = labels
        self.clusters_ = clusters
        self.grid_ = grid
        self.dense_cells_ = dense_cells
        
        return self
    
    def _get_neighbors(self, cell):
        """获取相邻网格单元"""
        neighbors = []
        for i in range(len(cell)):
            for delta in (-1, 1):
                neighbor = list(cell)
                neighbor[i] += delta
                if 0 <= neighbor[i] < self.xi:
                    neighbors.append(tuple(neighbor))
        return neighbors

# 四、使用示例

def example_usage():
    """CLIQUE算法使用示例"""
    import matplotlib.pyplot as plt
    from sklearn.datasets import make_blobs
    from sklearn.preprocessing import StandardScaler
    
    # 生成示例数据
    X, y_true = make_blobs(n_samples=1000, centers=3, n_features=2, random_state=42)
    
    # 添加一些噪声
    np.random.seed(42)
    noise = np.random.randn(50, 2) * 3
    X = np.vstack([X, noise])
    y_true = np.concatenate([y_true, -np.ones(50)])  # 噪声点标签为-1
    
    # 标准化
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    
    # 使用简化版CLIQUE
    clique = SimpleCLIQUE(xi=15, tau=0.05)
    clique.fit(X_scaled)
    
    # 可视化结果
    plt.figure(figsize=(12, 5))
    
    # 真实标签
    plt.subplot(1, 2, 1)
    plt.scatter(X_scaled[:, 0], X_scaled[:, 1], c=y_true, cmap='viridis', s=20, alpha=0.7)
    plt.title("True Clusters")
    plt.xlabel("Feature 1")
    plt.ylabel("Feature 2")
    
    # CLIQUE聚类结果
    plt.subplot(1, 2, 2)
    unique_labels = set(clique.labels_)
    colors = plt.cm.tab20(np.linspace(0, 1, len(unique_labels)))
    
    for k, col in zip(unique_labels, colors):
        if k == -1:
            col = 'gray'  # 噪声点
        
        class_member_mask = (clique.labels_ == k)
        xy = X_scaled[class_member_mask]
        plt.scatter(xy[:, 0], xy[:, 1], c=[col], s=20, alpha=0.7, 
                   label=f'Cluster {k}' if k != -1 else 'Noise')
    
    plt.title(f"CLIQUE Clustering (xi={clique.xi}, tau={clique.tau})")
    plt.xlabel("Feature 1")
    plt.ylabel("Feature 2")
    plt.legend()
    
    plt.tight_layout()
    plt.show()
    
    # 输出聚类统计
    n_clusters = len(set(clique.labels_)) - (1 if -1 in clique.labels_ else 0)
    n_noise = np.sum(clique.labels_ == -1)
    
    print(f"Number of clusters found: {n_clusters}")
    print(f"Number of noise points: {n_noise}")
    print(f"Cluster sizes: {np.bincount(clique.labels_[clique.labels_ >= 0])}")
    
    return clique

# 五、高级功能:CLIQUE用于高维数据

def high_dim_example():
    """高维数据上的CLIQUE示例"""
    from sklearn.datasets import make_classification
    from sklearn.decomposition import PCA
    import matplotlib.pyplot as plt
    
    # 生成高维数据(20维,但簇只存在于部分维度)
    X, y = make_classification(
        n_samples=1000, 
        n_features=20,
        n_informative=5,  # 只有5个特征与聚类相关
        n_redundant=5,
        n_clusters_per_class=1,
        n_classes=3,
        random_state=42
    )
    
    # 使用CLIQUE
    clique = SimpleCLIQUE(xi=10, tau=0.05)
    clique.fit(X)
    
    # 使用PCA降维可视化
    pca = PCA(n_components=2)
    X_pca = pca.fit_transform(X)
    
    # 可视化
    fig, axes = plt.subplots(1, 2, figsize=(12, 5))
    
    # 真实标签
    axes[0].scatter(X_pca[:, 0], X_pca[:, 1], c=y, cmap='viridis', s=20, alpha=0.7)
    axes[0].set_title("True Classes (PCA Projection)")
    axes[0].set_xlabel("PC1")
    axes[0].set_ylabel("PC2")
    
    # CLIQUE结果
    unique_labels = set(clique.labels_)
    colors = plt.cm.tab20(np.linspace(0, 1, len(unique_labels)))
    
    for k, col in zip(unique_labels, colors):
        if k == -1:
            col = 'gray'
        
        mask = (clique.labels_ == k)
        axes[1].scatter(X_pca[mask, 0], X_pca[mask, 1], 
                       c=[col], s=20, alpha=0.7,
                       label=f'Cluster {k}' if k != -1 else 'Noise')
    
    axes[1].set_title(f"CLIQUE Clustering")
    axes[1].set_xlabel("PC1")
    axes[1].set_ylabel("PC2")
    axes[1].legend()
    
    plt.tight_layout()
    plt.show()
    
    # 统计信息
    print("High-dimensional Clustering Results:")
    print(f"Total samples: {X.shape[0]}")
    print(f"Total features: {X.shape[1]}")
    print(f"Clusters found: {len(set(clique.labels_)) - (1 if -1 in clique.labels_ else 0)}")
    print(f"Noise points: {np.sum(clique.labels_ == -1)}")
    
    return clique, X_pca

# 运行示例
if __name__ == "__main__":
    print("=" * 60)
    print("CLIQUE算法实现与示例")
    print("=" * 60)
    
    # 示例1:基本使用
    print("\n1. 二维数据示例:")
    clique_2d = example_usage()
    
    # 示例2:高维数据
    print("\n2. 高维数据示例:")
    clique_hd, X_pca = high_dim_example()

 

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注