Clustering

manify.clustering

Clustering algorithms for Riemannian manifolds. Under construction.

RiemannianFuzzyKMeans(n_clusters, pm, m=2.0, lr=0.1, max_iter=100, tol=0.0001, optimizer='adan', random_state=None, verbose=False)

Bases: BaseEstimator, ClusterMixin

Riemannian Fuzzy K-Means.

Attributes:
  • n_clusters

    The number of clusters to form.

  • pm

    An initialized manifold object (from manifolds.py) on which clustering will be performed.

  • m

    Fuzzifier parameter. Controls the softness of the partition.

  • lr

    Learning rate for the optimizer.

  • max_iter

    Maximum number of iterations for the optimization.

  • tol

    Tolerance for convergence. If the change in loss is less than tol, iteration stops.

  • optimizer

    The optimizer to use for updating cluster centers.

  • random_state

    Seed for random number generation for reproducibility.

  • verbose

    Whether to print loss information during iterations.

  • losses_

    List of loss values during training.

  • u_

    Final fuzzy partition matrix.

  • labels_

    Cluster labels for each sample.

  • cluster_centers_

    Final cluster centers.

Parameters:
  • n_clusters (int) –

    The number of clusters to form.

  • manifold

    An initialized manifold object (from manifolds.py) on which clustering will be performed.

  • m (float, default: 2.0 ) –

    Fuzzifier parameter. Controls the softness of the partition.

  • lr (float, default: 0.1 ) –

    Learning rate for the optimizer.

  • max_iter (int, default: 100 ) –

    Maximum number of iterations for the optimization.

  • tol (float, default: 0.0001 ) –

    Tolerance for convergence. If the change in loss is less than tol, iteration stops.

  • optimizer (Literal['adan', 'adam'], default: 'adan' ) –

    The optimizer to use for updating cluster centers.

  • random_state (int | None, default: None ) –

    Seed for random number generation for reproducibility.

  • verbose (bool, default: False ) –

    Whether to print loss information during iterations.

Source code in manify/clustering/fuzzy_kmeans.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def __init__(
    self,
    n_clusters: int,
    pm: Manifold | ProductManifold,
    m: float = 2.0,
    lr: float = 0.1,
    max_iter: int = 100,
    tol: float = 1e-4,
    optimizer: Literal["adan", "adam"] = "adan",
    random_state: int | None = None,
    verbose: bool = False,
):
    self.n_clusters = n_clusters
    self.pm = pm
    self.m = m
    self.lr = lr
    self.max_iter = max_iter
    self.tol = tol
    if optimizer not in ("adan", "adam"):
        raise ValueError("optimizer must be 'adan' or 'adam'")
    self.optimizer = optimizer
    self.random_state = random_state
    self.verbose = verbose

fit(X, y=None)

Fit the Riemannian Fuzzy K-Means model to the data X.

Parameters:
  • X (Float[Tensor, 'n_points n_features']) –

    Input data. Features should match the manifold's geometry.

  • y (None, default: None ) –

    Ignored, present for compatibility with scikit-learn's API.

Returns:
  • self( 'RiemannianFuzzyKMeans' ) –

    Fitted RiemannianFuzzyKMeans instance.

Raises:
  • ValueError

    If the input data's dimension does not match the manifold's ambient dimension.

  • RuntimeError

    If the optimizer is not set correctly or if the model has not been initialized properly.

Source code in manify/clustering/fuzzy_kmeans.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def fit(self, X: Float[torch.Tensor, "n_points n_features"], y: None = None) -> "RiemannianFuzzyKMeans":
    """Fit the Riemannian Fuzzy K-Means model to the data X.

    Args:
        X: Input data. Features should match the manifold's geometry.
        y: Ignored, present for compatibility with scikit-learn's API.

    Returns:
        self: Fitted `RiemannianFuzzyKMeans` instance.

    Raises:
        ValueError: If the input data's dimension does not match the manifold's ambient dimension.
        RuntimeError: If the optimizer is not set correctly or if the model has not been initialized properly.
    """
    if isinstance(X, np.ndarray):
        X = torch.from_numpy(X).type(torch.get_default_dtype())
    elif not isinstance(X, torch.Tensor):
        X = torch.tensor(X, dtype=torch.get_default_dtype())

    # Ensure X is on the same device as the manifold
    X = X.to(self.pm.device)

    if X.shape[1] != self.pm.ambient_dim:
        raise ValueError(
            f"Input data X's dimension ({X.shape[1]}) in fit() does not match "
            f"the manifold's ambient dimension ({self.pm.ambient_dim})."
        )

    self._init_centers(X)
    m, tol = self.m, self.tol
    losses = []
    for i in range(self.max_iter):
        self.opt_.zero_grad()
        # self.pm.dist is implemented in manifolds.py and handles broadcasting
        d = self.pm.dist(X, self.mu_)  # X is (N,D), mu_ is (K,D) -> d is (N,K)
        # Original RFK: d = self.pm.dist(X.unsqueeze(1), self.mu_.unsqueeze(0))
        # The .dist in manifolds.py uses X[:, None] and Y[None, :], so direct call should work if mu_ is (K,D)

        S = torch.sum(d.pow(-2 / (m - 1)) + 1e-8, dim=1)  # Add epsilon for stability
        loss = torch.sum(S.pow(1 - m))
        loss.backward()
        losses.append(loss.item())
        self.opt_.step()
        if self.verbose:
            print(f"RFK iter {i + 1}, loss={loss.item():.4f}")
        if i > 0 and abs(losses[-1] - losses[-2]) < tol:
            break

    # save the result
    self.losses_ = np.array(losses)
    with torch.no_grad():  # Ensure no gradients are computed for final calculations
        dfin = self.pm.dist(X, self.mu_)  # Re-calculate dist to final centers
        inv = dfin.pow(-2 / (m - 1)) + 1e-8  # Add epsilon
        u_final = inv / (inv.sum(dim=1, keepdim=True) + 1e-8)  # Add epsilon
    self.u_ = u_final.detach().cpu().numpy()
    self.labels_ = np.argmax(self.u_, axis=1)
    self.cluster_centers_ = self.mu_.data.clone().detach().cpu().numpy()
    return self

predict(X)

Predict the closest cluster each sample in X belongs to.

Parameters:
  • X (Float[Tensor, 'n_points n_features']) –

    Input data. Features should match the manifold's geometry.

Returns:
  • labels( Int[Tensor, 'n_points'] ) –

    Cluster labels for each sample in X.

Raises:
  • ValueError

    If the input data's dimension does not match the manifold's ambient dimension.

  • RuntimeError

    If the model has not been fitted yet.

Source code in manify/clustering/fuzzy_kmeans.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def predict(self, X: Float[torch.Tensor, "n_points n_features"]) -> Int[torch.Tensor, "n_points"]:
    """Predict the closest cluster each sample in X belongs to.

    Args:
        X: Input data. Features should match the manifold's geometry.

    Returns:
        labels: Cluster labels for each sample in X.

    Raises:
        ValueError: If the input data's dimension does not match the manifold's ambient dimension.
        RuntimeError: If the model has not been fitted yet.
    """
    if isinstance(X, np.ndarray):
        X = torch.from_numpy(X).type(torch.get_default_dtype())
    elif not isinstance(X, torch.Tensor):
        X = torch.tensor(X, dtype=torch.get_default_dtype())

    # Ensure X is on the same device as the manifold
    X = X.to(self.pm.device)

    if X.shape[1] != self.pm.ambient_dim:
        raise ValueError(
            f"Input data X's dimension ({X.shape[1]}) in predict() does not match "
            f"the manifold's ambient dimension ({self.pm.ambient_dim})."
        )

    if not hasattr(self, "mu_") or self.mu_ is None:
        raise RuntimeError("The RFK model has not been fitted yet. Call 'fit' before 'predict'.")

    with torch.no_grad():
        dmat = self.pm.dist(X, self.mu_)  # X is (N,D), mu_ is (K,D) -> dmat is (N,K)
        inv = dmat.pow(-2 / (self.m - 1)) + 1e-8  # Add epsilon
        u = inv / (inv.sum(dim=1, keepdim=True) + 1e-8)  # Add epsilon
        labels = torch.argmax(u, dim=1).cpu().numpy()
    return labels

fuzzy_kmeans

Riemannian Fuzzy K-Means.

The Riemannian Fuzzy K-Means algorithm is a clustering algorithm that operates on Riemannian manifolds. Compared to a straightforward extension of K-Means or Fuzzy K-Means to Riemannian manifolds, it offers significant acceleration while achieving lower loss. For more details, please refer to the paper: https://openreview.net/forum?id=9VmOgMN4Ie

If you find this work useful, please cite the paper as follows:

@article{Yuan2025,
  title={Riemannian Fuzzy K-Means},
  author={Anonymous},
  journal={OpenReview},
  year={2025},
  url={https://openreview.net/forum?id=9VmOgMN4Ie}
}

If you have questions about the code, feel free to contact: yuanjinghuiiii@gmail.com.

RiemannianFuzzyKMeans(n_clusters, pm, m=2.0, lr=0.1, max_iter=100, tol=0.0001, optimizer='adan', random_state=None, verbose=False)

Bases: BaseEstimator, ClusterMixin

Riemannian Fuzzy K-Means.

Attributes:
  • n_clusters

    The number of clusters to form.

  • pm

    An initialized manifold object (from manifolds.py) on which clustering will be performed.

  • m

    Fuzzifier parameter. Controls the softness of the partition.

  • lr

    Learning rate for the optimizer.

  • max_iter

    Maximum number of iterations for the optimization.

  • tol

    Tolerance for convergence. If the change in loss is less than tol, iteration stops.

  • optimizer

    The optimizer to use for updating cluster centers.

  • random_state

    Seed for random number generation for reproducibility.

  • verbose

    Whether to print loss information during iterations.

  • losses_

    List of loss values during training.

  • u_

    Final fuzzy partition matrix.

  • labels_

    Cluster labels for each sample.

  • cluster_centers_

    Final cluster centers.

Parameters:
  • n_clusters (int) –

    The number of clusters to form.

  • manifold

    An initialized manifold object (from manifolds.py) on which clustering will be performed.

  • m (float, default: 2.0 ) –

    Fuzzifier parameter. Controls the softness of the partition.

  • lr (float, default: 0.1 ) –

    Learning rate for the optimizer.

  • max_iter (int, default: 100 ) –

    Maximum number of iterations for the optimization.

  • tol (float, default: 0.0001 ) –

    Tolerance for convergence. If the change in loss is less than tol, iteration stops.

  • optimizer (Literal['adan', 'adam'], default: 'adan' ) –

    The optimizer to use for updating cluster centers.

  • random_state (int | None, default: None ) –

    Seed for random number generation for reproducibility.

  • verbose (bool, default: False ) –

    Whether to print loss information during iterations.

Source code in manify/clustering/fuzzy_kmeans.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def __init__(
    self,
    n_clusters: int,
    pm: Manifold | ProductManifold,
    m: float = 2.0,
    lr: float = 0.1,
    max_iter: int = 100,
    tol: float = 1e-4,
    optimizer: Literal["adan", "adam"] = "adan",
    random_state: int | None = None,
    verbose: bool = False,
):
    self.n_clusters = n_clusters
    self.pm = pm
    self.m = m
    self.lr = lr
    self.max_iter = max_iter
    self.tol = tol
    if optimizer not in ("adan", "adam"):
        raise ValueError("optimizer must be 'adan' or 'adam'")
    self.optimizer = optimizer
    self.random_state = random_state
    self.verbose = verbose
fit(X, y=None)

Fit the Riemannian Fuzzy K-Means model to the data X.

Parameters:
  • X (Float[Tensor, 'n_points n_features']) –

    Input data. Features should match the manifold's geometry.

  • y (None, default: None ) –

    Ignored, present for compatibility with scikit-learn's API.

Returns:
  • self( 'RiemannianFuzzyKMeans' ) –

    Fitted RiemannianFuzzyKMeans instance.

Raises:
  • ValueError

    If the input data's dimension does not match the manifold's ambient dimension.

  • RuntimeError

    If the optimizer is not set correctly or if the model has not been initialized properly.

Source code in manify/clustering/fuzzy_kmeans.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def fit(self, X: Float[torch.Tensor, "n_points n_features"], y: None = None) -> "RiemannianFuzzyKMeans":
    """Fit the Riemannian Fuzzy K-Means model to the data X.

    Args:
        X: Input data. Features should match the manifold's geometry.
        y: Ignored, present for compatibility with scikit-learn's API.

    Returns:
        self: Fitted `RiemannianFuzzyKMeans` instance.

    Raises:
        ValueError: If the input data's dimension does not match the manifold's ambient dimension.
        RuntimeError: If the optimizer is not set correctly or if the model has not been initialized properly.
    """
    if isinstance(X, np.ndarray):
        X = torch.from_numpy(X).type(torch.get_default_dtype())
    elif not isinstance(X, torch.Tensor):
        X = torch.tensor(X, dtype=torch.get_default_dtype())

    # Ensure X is on the same device as the manifold
    X = X.to(self.pm.device)

    if X.shape[1] != self.pm.ambient_dim:
        raise ValueError(
            f"Input data X's dimension ({X.shape[1]}) in fit() does not match "
            f"the manifold's ambient dimension ({self.pm.ambient_dim})."
        )

    self._init_centers(X)
    m, tol = self.m, self.tol
    losses = []
    for i in range(self.max_iter):
        self.opt_.zero_grad()
        # self.pm.dist is implemented in manifolds.py and handles broadcasting
        d = self.pm.dist(X, self.mu_)  # X is (N,D), mu_ is (K,D) -> d is (N,K)
        # Original RFK: d = self.pm.dist(X.unsqueeze(1), self.mu_.unsqueeze(0))
        # The .dist in manifolds.py uses X[:, None] and Y[None, :], so direct call should work if mu_ is (K,D)

        S = torch.sum(d.pow(-2 / (m - 1)) + 1e-8, dim=1)  # Add epsilon for stability
        loss = torch.sum(S.pow(1 - m))
        loss.backward()
        losses.append(loss.item())
        self.opt_.step()
        if self.verbose:
            print(f"RFK iter {i + 1}, loss={loss.item():.4f}")
        if i > 0 and abs(losses[-1] - losses[-2]) < tol:
            break

    # save the result
    self.losses_ = np.array(losses)
    with torch.no_grad():  # Ensure no gradients are computed for final calculations
        dfin = self.pm.dist(X, self.mu_)  # Re-calculate dist to final centers
        inv = dfin.pow(-2 / (m - 1)) + 1e-8  # Add epsilon
        u_final = inv / (inv.sum(dim=1, keepdim=True) + 1e-8)  # Add epsilon
    self.u_ = u_final.detach().cpu().numpy()
    self.labels_ = np.argmax(self.u_, axis=1)
    self.cluster_centers_ = self.mu_.data.clone().detach().cpu().numpy()
    return self
predict(X)

Predict the closest cluster each sample in X belongs to.

Parameters:
  • X (Float[Tensor, 'n_points n_features']) –

    Input data. Features should match the manifold's geometry.

Returns:
  • labels( Int[Tensor, 'n_points'] ) –

    Cluster labels for each sample in X.

Raises:
  • ValueError

    If the input data's dimension does not match the manifold's ambient dimension.

  • RuntimeError

    If the model has not been fitted yet.

Source code in manify/clustering/fuzzy_kmeans.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def predict(self, X: Float[torch.Tensor, "n_points n_features"]) -> Int[torch.Tensor, "n_points"]:
    """Predict the closest cluster each sample in X belongs to.

    Args:
        X: Input data. Features should match the manifold's geometry.

    Returns:
        labels: Cluster labels for each sample in X.

    Raises:
        ValueError: If the input data's dimension does not match the manifold's ambient dimension.
        RuntimeError: If the model has not been fitted yet.
    """
    if isinstance(X, np.ndarray):
        X = torch.from_numpy(X).type(torch.get_default_dtype())
    elif not isinstance(X, torch.Tensor):
        X = torch.tensor(X, dtype=torch.get_default_dtype())

    # Ensure X is on the same device as the manifold
    X = X.to(self.pm.device)

    if X.shape[1] != self.pm.ambient_dim:
        raise ValueError(
            f"Input data X's dimension ({X.shape[1]}) in predict() does not match "
            f"the manifold's ambient dimension ({self.pm.ambient_dim})."
        )

    if not hasattr(self, "mu_") or self.mu_ is None:
        raise RuntimeError("The RFK model has not been fitted yet. Call 'fit' before 'predict'.")

    with torch.no_grad():
        dmat = self.pm.dist(X, self.mu_)  # X is (N,D), mu_ is (K,D) -> dmat is (N,K)
        inv = dmat.pow(-2 / (self.m - 1)) + 1e-8  # Add epsilon
        u = inv / (inv.sum(dim=1, keepdim=True) + 1e-8)  # Add epsilon
        labels = torch.argmax(u, dim=1).cpu().numpy()
    return labels