gis-classification/src/strategies/classifiers.py

250 lines
7.5 KiB
Python

"""Built-in classification strategies."""
from typing import Any
import numpy as np
from scipy.stats import multivariate_normal
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from .base import ClassificationStrategy
class RandomForestStrategy(ClassificationStrategy):
"""Random Forest classification strategy."""
def __init__(
self,
n_estimators: int = 100,
max_depth: int | None = None,
random_state: int = 42,
**kwargs
):
self.n_estimators = n_estimators
self.max_depth = max_depth
self.random_state = random_state
self._clf = RandomForestClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
random_state=random_state,
**kwargs
)
def train(self, X: np.ndarray, y: np.ndarray) -> None:
self._clf.fit(X, y)
def predict(self, X: np.ndarray) -> np.ndarray:
return self._clf.predict(X)
def predict_proba(self, X: np.ndarray) -> np.ndarray:
return self._clf.predict_proba(X)
def get_params(self) -> dict[str, Any]:
return {
"n_estimators": self.n_estimators,
"max_depth": self.max_depth,
"random_state": self.random_state,
}
@property
def name(self) -> str:
return "RandomForest"
class SVMStrategy(ClassificationStrategy):
"""Support Vector Machine classification strategy."""
def __init__(
self,
kernel: str = "rbf",
C: float = 1.0,
gamma: str = "scale",
random_state: int = 42,
**kwargs
):
self.kernel = kernel
self.C = C
self.gamma = gamma
self.random_state = random_state
self._clf = SVC(
kernel=kernel,
C=C,
gamma=gamma,
random_state=random_state,
probability=True,
**kwargs
)
def train(self, X: np.ndarray, y: np.ndarray) -> None:
self._clf.fit(X, y)
def predict(self, X: np.ndarray) -> np.ndarray:
return self._clf.predict(X)
def predict_proba(self, X: np.ndarray) -> np.ndarray:
return self._clf.predict_proba(X)
def get_params(self) -> dict[str, Any]:
return {
"kernel": self.kernel,
"C": self.C,
"gamma": self.gamma,
"random_state": self.random_state,
}
@property
def name(self) -> str:
return "SVM"
class LogisticRegressionStrategy(ClassificationStrategy):
"""Logistic Regression classification strategy."""
def __init__(
self,
penalty: str = "l2",
C: float = 1.0,
max_iter: int = 1000,
random_state: int = 42,
**kwargs
):
self.penalty = penalty
self.C = C
self.max_iter = max_iter
self.random_state = random_state
self._clf = LogisticRegression(
penalty=penalty,
C=C,
max_iter=max_iter,
random_state=random_state,
**kwargs
)
def train(self, X: np.ndarray, y: np.ndarray) -> None:
self._clf.fit(X, y)
def predict(self, X: np.ndarray) -> np.ndarray:
return self._clf.predict(X)
def predict_proba(self, X: np.ndarray) -> np.ndarray:
return self._clf.predict_proba(X)
def get_params(self) -> dict[str, Any]:
return {
"penalty": self.penalty,
"C": self.C,
"max_iter": self.max_iter,
"random_state": self.random_state,
}
@property
def name(self) -> str:
return "LogisticRegression"
class MLEStrategy(ClassificationStrategy):
"""Maximum Likelihood Estimation classification strategy.
Assumes each class follows a multivariate normal distribution.
Classic algorithm for GIS/remote sensing classification.
"""
def __init__(self, reg_covar: float = 1e-6):
"""Initialize MLE classifier.
Args:
reg_covar: Regularization for covariance matrix stability.
"""
self.reg_covar = reg_covar
self._means: dict[Any, np.ndarray] = {}
self._covs: dict[Any, np.ndarray] = {}
self._priors: dict[Any, float] = {}
self._classes: np.ndarray | None = None
def train(self, X: np.ndarray, y: np.ndarray) -> None:
"""Estimate mean, covariance and prior for each class."""
self._classes = np.unique(y)
self._means = {}
self._covs = {}
self._priors = {}
n_samples = len(y)
for cls in self._classes:
X_cls = X[y == cls]
# Prior probability
self._priors[cls] = len(X_cls) / n_samples
# Mean vector
self._means[cls] = np.mean(X_cls, axis=0)
# Covariance matrix with regularization
cov = np.cov(X_cls, rowvar=False)
if cov.ndim == 0:
cov = np.array([[cov]])
cov += np.eye(cov.shape[0]) * self.reg_covar
self._covs[cls] = cov
def _compute_log_likelihood(self, X: np.ndarray, cls: Any) -> np.ndarray:
"""Compute log-likelihood for a class."""
mean = self._means[cls]
cov = self._covs[cls]
prior = self._priors[cls]
try:
rv = multivariate_normal(mean=mean, cov=cov, allow_singular=True)
log_likelihood = rv.logpdf(X)
except Exception:
# Fallback: compute manually
diff = X - mean
try:
cov_inv = np.linalg.inv(cov)
except np.linalg.LinAlgError:
cov_inv = np.linalg.pinv(cov)
mahalanobis = np.sum(diff @ cov_inv * diff, axis=1)
log_det = np.linalg.slogdet(cov)[1]
log_likelihood = -0.5 * (X.shape[1] * np.log(2 * np.pi) + log_det + mahalanobis)
return log_likelihood + np.log(prior)
def predict(self, X: np.ndarray) -> np.ndarray:
"""Predict class with maximum likelihood."""
if self._classes is None:
raise RuntimeError("Classifier not trained")
# Compute log-likelihoods for all classes
log_likelihoods = np.zeros((X.shape[0], len(self._classes)))
for i, cls in enumerate(self._classes):
log_likelihoods[:, i] = self._compute_log_likelihood(X, cls)
# Return class with maximum likelihood
return self._classes[np.argmax(log_likelihoods, axis=1)]
def predict_proba(self, X: np.ndarray) -> np.ndarray:
"""Predict class probabilities using softmax of log-likelihoods."""
if self._classes is None:
raise RuntimeError("Classifier not trained")
# Compute log-likelihoods
log_likelihoods = np.zeros((X.shape[0], len(self._classes)))
for i, cls in enumerate(self._classes):
log_likelihoods[:, i] = self._compute_log_likelihood(X, cls)
# Convert to probabilities via softmax
log_likelihoods -= np.max(log_likelihoods, axis=1, keepdims=True)
exp_ll = np.exp(log_likelihoods)
probabilities = exp_ll / np.sum(exp_ll, axis=1, keepdims=True)
return probabilities
def get_params(self) -> dict[str, Any]:
return {
"reg_covar": self.reg_covar,
}
@property
def name(self) -> str:
return "MLE"