# -*- coding: utf-8 -*-
import os
from typing import Dict, Optional, Union
import mlflow
import numpy as np
import pandas as pd
from rich import print
from sklearn import metrics
from sklearn.cluster import DBSCAN, AffinityPropagation, KMeans
from ..constants import MLFLOW_ARTIFACT_DATA_PATH, MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH
from ..utils.base import clear_output, save_data, save_fig
from ._base import WorkflowBase
from .func.algo_clustering._dbscan import dbscan_manual_hyper_parameters, dbscan_result_plot
from .func.algo_clustering._kmeans import kmeans_manual_hyper_parameters, plot_silhouette_diagram, scatter2d, scatter3d
[docs]
class ClusteringWorkflowBase(WorkflowBase):
"""The base workflow class of clustering algorithms."""
common_function = ["Cluster Centers", "Cluster Labels", "Model Persistence"]
def __init__(self):
super().__init__()
self.clustering_result = None
[docs]
def fit(self, X: pd.DataFrame, y: Optional[pd.DataFrame] = None) -> None:
"""Fit the model according to the given training data."""
self.X = X
self.model.fit(X)
mlflow.log_params(self.model.get_params())
[docs]
@classmethod
def manual_hyper_parameters(cls) -> Dict:
"""Manual hyper-parameters specification."""
return dict()
# TODO(Samson 1057266013@qq.com): This function might need to be rethought.
[docs]
def get_cluster_centers(self) -> np.ndarray:
"""Get the cluster centers."""
print("-----* Clustering Centers *-----")
print(getattr(self.model, "cluster_centers_", "This class don not have cluster_centers_"))
return getattr(self.model, "cluster_centers_", "This class don not have cluster_centers_")
[docs]
def get_labels(self):
"""Get the cluster labels."""
print("-----* Clustering Labels *-----")
# self.X['clustering result'] = self.model.labels_
self.clustering_result = pd.DataFrame(self.model.labels_, columns=["clustering result"])
print(self.clustering_result)
GEOPI_OUTPUT_ARTIFACTS_DATA_PATH = os.getenv("GEOPI_OUTPUT_ARTIFACTS_DATA_PATH")
save_data(self.clustering_result, f"{self.naming} Result", GEOPI_OUTPUT_ARTIFACTS_DATA_PATH, MLFLOW_ARTIFACT_DATA_PATH)
[docs]
class KMeansClustering(ClusteringWorkflowBase):
"""The automation workflow of using KMeans algorithm to make insightful products."""
name = "KMeans"
special_function = ["KMeans Score"]
def __init__(
self,
n_clusters: int = 8,
init: str = "k-means++",
n_init: int = 10,
max_iter: int = 300,
tol: float = 1e-4,
verbose: int = 0,
random_state: Optional[int] = None,
copy_x: bool = True,
algorithm: str = "auto",
) -> None:
"""
Parameters
----------
n_clusters : int, default=8
The number of clusters to form as well as the number of
centroids to generate.
init : {'k-means++', 'random'}, callable or array-like of shape \
(n_clusters, n_features), default='k-means++'
Method for initialization:
'k-means++' : selects initial cluster centers for k-mean
clustering in a smart way to speed up convergence. See section
Notes in k_init for more details.
'random': choose `n_clusters` observations (rows) at random from data
for the initial centroids.
If an array is passed, it should be of shape (n_clusters, n_features)
and gives the initial centers.
If a callable is passed, it should take arguments X, n_clusters and a
random state and return an initialization.
n_init : int, default=10
Number of time the k-means algorithm will be run with different
centroid seeds. The final results will be the best output of
n_init consecutive runs in terms of inertia.
max_iter : int, default=300
Maximum number of iterations of the k-means algorithm for a
single run.
tol : float, default=1e-4
Relative tolerance with regards to Frobenius norm of the difference
in the cluster centers of two consecutive iterations to declare
convergence.
verbose : int, default=0
Verbosity mode.
random_state : int, RandomState instance or None, default=None
Determines random number generation for centroid initialization. Use
an int to make the randomness deterministic.
See :term:`Glossary <random_state>`.
copy_x : bool, default=True
When pre-computing distances it is more numerically accurate to center
the data first. If copy_x is True (default), then the original data is
not modified. If False, the original data is modified, and put back
before the function returns, but small numerical differences may be
introduced by subtracting and then adding the data mean. Note that if
the original data is not C-contiguous, a copy will be made even if
copy_x is False. If the original data is sparse, but not in CSR format,
a copy will be made even if copy_x is False.
algorithm : {"auto", "full", "elkan"}, default="auto"
K-means algorithm to use. The classical EM-style algorithm is "full".
The "elkan" variation is more efficient on data with well-defined
clusters, by using the triangle inequality. However it's more memory
intensive due to the allocation of an extra array of shape
(n_samples, n_clusters).
For now "auto" (kept for backward compatibility) chooses "elkan" but it
might change in the future for a better heuristic.
References
----------------------------------------
Scikit-learn API: sklearn.cluster.KMeans
https://scikit-learn.org/stable/modules/generated/sklearn.cluster.KMeans.html
"""
super().__init__()
self.n_clusters = n_clusters
self.init = init
self.max_iter = max_iter
self.tol = tol
self.n_init = n_init
self.verbose = verbose
self.random_state = random_state
self.copy_x = copy_x
self.algorithm = algorithm
self.model = KMeans(
n_clusters=self.n_clusters,
init=self.init,
n_init=self.n_init,
max_iter=self.max_iter,
tol=self.tol,
verbose=self.verbose,
random_state=self.random_state,
copy_x=self.copy_x,
algorithm=self.algorithm,
)
self.naming = KMeansClustering.name
[docs]
@classmethod
def manual_hyper_parameters(cls) -> Dict:
"""Manual hyper-parameters specification."""
print(f"-*-*- {cls.name} - Hyper-parameters Specification -*-*-")
hyper_parameters = kmeans_manual_hyper_parameters()
clear_output()
return hyper_parameters
def _get_scores(self):
"""Get the scores of the clustering result."""
print("-----* KMeans Scores *-----")
print("Inertia Score: ", self.model.inertia_)
print("Calinski Harabasz Score: ", metrics.calinski_harabasz_score(self.X, self.model.labels_))
print("Silhouette Score: ", metrics.silhouette_score(self.X, self.model.labels_))
mlflow.log_metric("Inertia Score", self.model.inertia_)
mlflow.log_metric("Calinski Harabasz Score", metrics.calinski_harabasz_score(self.X, self.model.labels_))
mlflow.log_metric("Silhouette Score", metrics.silhouette_score(self.X, self.model.labels_))
@staticmethod
def _plot_silhouette_diagram(
data: pd.DataFrame,
cluster_labels: pd.DataFrame,
cluster_centers_: np.ndarray,
n_clusters: int,
algorithm_name: str,
local_path: str,
mlflow_path: str,
) -> None:
"""Plot the silhouette diagram of the clustering result."""
print("-----* Silhouette Diagram *-----")
plot_silhouette_diagram(data, cluster_labels, cluster_centers_, n_clusters, algorithm_name)
save_fig(f"Silhouette Diagram - {algorithm_name}", local_path, mlflow_path)
data_with_labels = pd.concat([data, cluster_labels], axis=1)
save_data(data_with_labels, "Silhouette Diagram - Data With Labels", local_path, mlflow_path)
cluster_center_data = pd.DataFrame(cluster_centers_, columns=data.columns)
save_data(cluster_center_data, "Silhouette Diagram - Cluster Centers", local_path, mlflow_path)
@staticmethod
def _scatter2d(data: pd.DataFrame, cluster_labels: pd.DataFrame, algorithm_name: str, local_path: str, mlflow_path: str) -> None:
"""Plot the two-dimensional diagram of the clustering result."""
print("-----* Cluster Two-Dimensional Diagram *-----")
scatter2d(data, cluster_labels, algorithm_name)
save_fig(f"Cluster Two-Dimensional Diagram - {algorithm_name}", local_path, mlflow_path)
data_with_labels = pd.concat([data, cluster_labels], axis=1)
save_data(data_with_labels, f"Cluster Two-Dimensional Diagram - {algorithm_name}", local_path, mlflow_path)
@staticmethod
def _scatter3d(data: pd.DataFrame, cluster_labels: pd.DataFrame, algorithm_name: str, local_path: str, mlflow_path: str) -> None:
"""Plot the three-dimensional diagram of the clustering result."""
print("-----* Cluster Three-Dimensional Diagram *-----")
scatter3d(data, cluster_labels, algorithm_name)
save_fig(f"Cluster Three-Dimensional Diagram - {algorithm_name}", local_path, mlflow_path)
data_with_labels = pd.concat([data, cluster_labels], axis=1)
save_data(data_with_labels, f"Cluster Two-Dimensional Diagram - {algorithm_name}", local_path, mlflow_path)
[docs]
def special_components(self, **kwargs: Union[Dict, np.ndarray, int]) -> None:
"""Invoke all special application functions for this algorithms by Scikit-learn framework."""
GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH = os.getenv("GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH")
self._get_scores()
self._plot_silhouette_diagram(
data=self.X,
cluster_labels=self.clustering_result["clustering result"],
cluster_centers_=self.get_cluster_centers(),
n_clusters=self.n_clusters,
algorithm_name=self.naming,
local_path=GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH,
mlflow_path=MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH,
)
# Draw graphs when the number of principal components > 3
if self.X.shape[1] >= 3:
# choose two of dimensions to draw
two_dimen_axis_index, two_dimen_data = self.choose_dimension_data(self.X, 2)
self._scatter2d(
data=two_dimen_data,
cluster_labels=self.clustering_result["clustering result"],
algorithm_name=self.naming,
local_path=GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH,
mlflow_path=MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH,
)
# choose three of dimensions to draw
three_dimen_axis_index, three_dimen_data = self.choose_dimension_data(self.X, 3)
self._scatter3d(
data=three_dimen_data,
cluster_labels=self.clustering_result["clustering result"],
algorithm_name=self.naming,
local_path=GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH,
mlflow_path=MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH,
)
elif self.X.shape[1] == 3:
# choose two of dimensions to draw
two_dimen_axis_index, two_dimen_data = self.choose_dimension_data(self.X, 2)
self._scatter2d(
data=two_dimen_data,
cluster_labels=self.clustering_result["clustering result"],
algorithm_name=self.naming,
local_path=GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH,
mlflow_path=MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH,
)
# no need to choose
self._scatter3d(
data=self.X,
cluster_labels=self.clustering_result["clustering result"],
algorithm_name=self.naming,
local_path=GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH,
mlflow_path=MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH,
)
elif self.X.shape[1] == 2:
self._scatter2d(
data=self.X,
cluster_labels=self.clustering_result["clustering result"],
algorithm_name=self.naming,
local_path=GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH,
mlflow_path=MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH,
)
else:
pass
[docs]
class DBSCANClustering(ClusteringWorkflowBase):
"""The automation workflow of using DBSCAN algorithm to make insightful products."""
name = "DBSCAN"
special_function = ["Virtualization of Result in 2D Graph"]
def __init__(
self,
eps: float = 0.5,
min_samples: int = 5,
metric: str = "euclidean",
metric_params: Optional[Dict] = None,
algorithm: str = "auto",
leaf_size: int = 30,
p: float = None,
n_jobs: int = None,
) -> None:
"""
Parameters
----------
eps : float, default=0.5
The maximum distance between two samples for one to be considered as in the neighborhood of the other. This is not a maximum bound on the distances of points within a cluster.
This is the most important DBSCAN parameter to choose appropriately for your data set and distance function.
min_samples : int, default=5
The number of samples (or total weight) in a neighborhood for a point to be considered as a core point. This includes the point itself.
metric : str, or callable, default=`euclidean`
The metric to use when calculating distance between instances in a feature array. If metric is a string or callable, it must be one of the options allowed
by sklearn.metrics.pairwise_distances for its metric parameter.
If metric is “precomputed”, X is assumed to be a distance matrix and must be square. X may be a sparse graph, in which case only “nonzero” elements may be considered neighbors for DBSCAN.
New in version 0.17: metric precomputed to accept precomputed sparse matrix.
metric_params : dict, default=None
Additional keyword arguments for the metric function.
New in version 0.19.
algorithm : {`auto`, `ball_tree`, `kd_tree`, `brute`}, default=`auto`
The algorithm to be used by the NearestNeighbors module to compute pointwise distances and find nearest neighbors. See NearestNeighbors module documentation for details.
leaf_size : int, default=30
Leaf size passed to BallTree or cKDTree. This can affect the speed of the construction and query, as well as the memory required to store the tree. The optimal value depends
on the nature of the problem.
p : float, default=None
The power of the Minkowski metric to be used to calculate distance between points. If None, then p=2 (equivalent to the Euclidean distance).
n_jobs : int, default=None
The number of parallel jobs to run. None means 1 unless in a joblib.parallel_backend context. -1 means using all processors. See Glossary for more details.
References
----------------------------------------
Scikit-learn API: sklearn.cluster.DBSCAN
https://scikit-learn.org/stable/modules/generated/sklearn.cluster.DBSCAN.html
"""
super().__init__()
self.eps = eps
self.min_samples = min_samples
self.metric = metric
self.metric_params = metric_params
self.algorithm = algorithm
self.leaf_size = leaf_size
self.p = p
self.n_jobs = n_jobs
self.model = DBSCAN(
eps=self.eps,
min_samples=self.min_samples,
metric=self.metric,
metric_params=self.metric_params,
algorithm=self.algorithm,
leaf_size=self.leaf_size,
p=self.p,
n_jobs=self.n_jobs,
)
self.naming = DBSCANClustering.name
[docs]
@classmethod
def manual_hyper_parameters(cls) -> Dict:
"""Manual hyper-parameters specification."""
print(f"-*-*- {cls.name} - Hyper-parameters Specification -*-*-")
hyper_parameters = dbscan_manual_hyper_parameters()
clear_output()
return hyper_parameters
@staticmethod
def _clustering_result_plot(X: pd.DataFrame, trained_model: any, algorithm_name: str, imag_config: dict, local_path: str, mlflow_path: str) -> None:
"""Plot the clustering result in 2D graph."""
print("-------** Cluster Two-Dimensional Diagram **----------")
dbscan_result_plot(X, trained_model, imag_config, algorithm_name)
save_fig(f"Cluster Two-Dimensional Diagram - {algorithm_name}", local_path, mlflow_path)
save_data(X, f"Cluster Two-Dimensional Diagram - {algorithm_name}", local_path, mlflow_path)
[docs]
def special_components(self, **kwargs: Union[Dict, np.ndarray, int]) -> None:
"""Invoke all special application functions for this algorithms by Scikit-learn framework."""
GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH = os.getenv("GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH")
self._clustering_result_plot(
X=self.X,
trained_model=self.model,
algorithm_name=self.naming,
imag_config=self.image_config,
local_path=GEOPI_OUTPUT_ARTIFACTS_IMAGE_MODEL_OUTPUT_PATH,
mlflow_path=MLFLOW_ARTIFACT_IMAGE_MODEL_OUTPUT_PATH,
)
[docs]
class AffinityPropagationClustering(ClusteringWorkflowBase):
name = "AffinityPropagation"
def __init__(
self,
*,
damping=0.5,
max_iter=200,
convergence_iter=15,
copy=True,
preference=None,
affinity="euclidean",
verbose=False,
random_state=None,
):
super().__init__()
self.damping = damping
self.max_iter = max_iter
self.convergence_iter = convergence_iter
self.copy = copy
self.verbose = verbose
self.preference = preference
self.affinity = affinity
self.random_state = random_state
self.model = AffinityPropagation(
damping=self.damping,
max_iter=self.max_iter,
convergence_iter=self.convergence_iter,
copy=self.copy,
preference=None,
affinity="euclidean",
verbose=False,
random_state=None,
)
self.naming = AffinityPropagationClustering.name
pass
[docs]
class MeanShiftClustering(ClusteringWorkflowBase):
name = "MeanShift"
pass
[docs]
class SpectralClustering(ClusteringWorkflowBase):
name = "Spectral"
pass
[docs]
class WardHierarchicalClustering(ClusteringWorkflowBase):
name = "WardHierarchical"
pass
[docs]
class AgglomerativeClustering(ClusteringWorkflowBase):
name = "Agglomerative"
pass
[docs]
class OPTICSClustering(ClusteringWorkflowBase):
name = "OPTICS"
pass
[docs]
class GaussianMixturesClustering(ClusteringWorkflowBase):
name = "GaussianMixtures"
pass
[docs]
class BIRCHClusteringClustering(ClusteringWorkflowBase):
name = "BIRCHClustering"
pass
[docs]
class BisectingKMeansClustering(ClusteringWorkflowBase):
name = "BisectingKMeans"
pass