Source code for so4gp.algorithms.grad_pfs

# -*- coding: utf-8 -*-
# SPDX-License-Identifier: GNU GPL v3
# This file is licensed under the terms of the GNU GPL v3.0.
# See the LICENSE file at the root of this
# repository for complete details.

import ntpath
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
from .graank import GRAANK
from .cluster_gp import ClusterGP
from .graank_aco import AntGRAANK
from .graank_ga import GeneticGRAANK


[docs] class GradPFS: """ GradPFS is a filter-based algorithm for performing univariate or/and multivariate feature selection through gradual patterns for regression tasks. This algorithm is published in: """
[docs] def __init__(self, data_src: str | pd.DataFrame, min_score: float = 0.75, target_col: int | None = None): """ An algorithm based on the filter method for performing univariate or/and multivariate feature selection through gradual patterns for regression tasks (not suitable for classification tasks). The results are returned as a Pandas DataFrame. :param data_src: [required] the data in a CSV file or Pandas DataFrame. :param min_score: [optional] user-specified minimum correlation score for filtering redundant features, default=0.75. :param target_col: [optional] user-specified target column index, default=None. >>> import pandas >>> from so4gp.algorithms.grad_pfs import GradPFS >>> >>> dummy_data = [[30, 3, 1, 10], [35, 2, 2, 8], [40, 4, 2, 7], [50, 1, 1, 6], [52, 7, 1, 2]] >>> dummy_df = pandas.DataFrame(dummy_data, columns=['Age', 'Salary', 'Cars', 'Expenses']) >>> >>> fs_obj = GradPFS(data_src=dummy_df) >>> gp_cor = fs_obj.univariate_fs() >>> fs_obj.generate_pdf_report(fs_type='U') >>> >>> # fs_obj.target_col = 2 >>> # m_fs = fs_obj.multivariate_fs() >>> print(gp_cor) Age Salary Cars Expenses Age 1.0 0.6 -0.4 -1.0 Salary 0.6 1.0 -0.3 -0.6 Cars -0.4 -0.3 1.0 0.4 Expenses -1.0 -0.6 0.4 1.0 """ self.data_src = data_src """type data_src: str | pd.DataFrame""" self.file_path = "" """:type file_path: str""" if type(data_src) is str: self.file_path = data_src self.thd_score = min_score """:type thd_score: float""" self.target_col = target_col """:type target_col: int | None""" self.titles, self.data = None, None """:type titles: list | None""" """:type data: np.ndarray | None"""
def univariate_fs(self) -> pd.DataFrame: """ A method that runs the univariate GradPFS feature selection algorithm. The method that calculates the gradual correlation between each pair of attributes in the dataset. This is achieved by mining 2-attribute GPs and using their highest support values to show the correlation between them. The method returns a correlation matrix of feature similarities. :return: Correlation matrix as a pandas dataframe. """ # 1. Instantiate GRAANK object and extract GPs grad = GRAANK(self.data_src) self.titles = grad.titles self.data = grad.data grad.discover(ignore_support=True, apriori_level=2, target_col=self.target_col) # 2. Create a correlation matrix n = grad.col_count corr_mat = np.zeros((n, n), dtype=float) np.fill_diagonal(corr_mat, 1) # 3. Extract column names col_names = [] for col_obj in grad.titles: # col_names[int(col_obj[0])] = col_obj[1].decode() col_names.append(col_obj[1].decode()) col_names = np.array(col_names) # 4. Update correlation matrix with GP support for gp in (grad.gradual_patterns or []): score = gp.support i = int(gp.gradual_items[0].attribute_col) j = int(gp.gradual_items[1].attribute_col) i_symbol = str(gp.gradual_items[0].symbol) j_symbol = str(gp.gradual_items[1].symbol) if i_symbol != j_symbol: score = -score if abs(corr_mat[i][j]) < abs(score): corr_mat[i][j] = score corr_mat[j][i] = score # 5. Create Pandas DataFrame and return it as a result corr_mat = np.round(corr_mat, 4) corr_df = pd.DataFrame(corr_mat, columns=col_names) """:type corr_df: pd.DataFrame""" corr_df.index = col_names return corr_df def multivariate_fs(self, algorithm: str = 'GRAANK') -> pd.DataFrame | None: """ A method that runs the multivariate GradPFS feature selection algorithm. First, this method mines for Gradual Patterns (GPs) that contain the target feature. These GPs are considered to be relevant to the target variable. Second, the algorithm identifies the features associated with the mined GPs and extracts them; the remaining features are considered to be the most irrelevant to the target feature. This method raises a ValueError exception if the user does not specify the target feature column index. :param algorithm: [optional] the algorithm to use: 'GRAANK', 'ACO' - Ant Colony GRAANK, 'CLU' - Clustering GRAANK, 'GEA' - Genetic Algorithm GRAANK. (default = 'GRAANK') :return: A list of the correlated attributes as a Pandas dataframe. """ if self.target_col is None: raise ValueError("You must specify a target feature (column index).") # 1. Instantiate GRAANK object and extract GPs algorithm += 'GRAANK' # bypass for now (TO BE DELETED) if algorithm == 'CLU': grad = ClusterGP(self.data_src, min_sup=self.thd_score) elif algorithm == 'ACO': grad = AntGRAANK(self.data_src, min_sup=self.thd_score) elif algorithm == 'CLU': grad = GeneticGRAANK(self.data_src, min_sup=self.thd_score) else: grad = GRAANK(self.data_src, min_sup=self.thd_score) grad.discover(target_col=self.target_col) # grad.discover(target_col=self.target_col, exclude_target=True) self.titles = grad.titles self.data = grad.data # 2. Extract column names col_names = [] for col_obj in grad.titles: col_names.append(col_obj[1].decode()) col_names = np.array(col_names) # 3a. Collect the irrelevant features (and redundant among themselves) rel_lst = [] for gp in (grad.gradual_patterns or []): rel_attributes = gp.decompose()[0] for attr in rel_attributes: rel_lst.append(attr) rel_set = set(rel_lst) rel_set = rel_set.difference({self.target_col}) # # 4b. Identify irrelevant features by eliminating the relevant ones irr_set = set(grad.attr_cols.tolist()).difference(rel_set) irr_set = irr_set.difference({self.target_col}) # # 3b. Collect the irrelevant features (and redundant among themselves) # irr_lst = [] # for gp in grad.gradual_patterns: # irr_attributes = gp.get_attributes()[0] # for attr in irr_attributes: # irr_lst.append(attr) # irr_set = set(irr_lst) # # # 4b. Identify relevant features by eliminating the irrelevant ones # rel_set = set(grad.attr_cols.tolist()).difference(irr_set) # rel_set = rel_set.difference({self.target_col}) # # 5. Update the correlation list (relevant features w.r.t. target feature) irr_features = col_names[list(irr_set)] rel_features = col_names[list(rel_set)] corr_lst = [[{str(col_names[self.target_col])}, set(rel_features.tolist()), set(irr_features.tolist())], [{self.target_col}, rel_set, irr_set]] # # 3c. Update correlation matrix with GP support # corr_lst = [] # for gp in grad.gradual_patterns: # score = gp.support # lst_col = [] # lst_attr = [] # for gi in gp.gradual_items: # att = gi.attribute_col # att = -att if gi.symbol == '-' else att # lst_col.append(att) # lst_attr.append(col_names[att]) # corr_lst.append([set(lst_col), set(lst_attr), score]) # 6. Create Pandas DataFrame and return it as a result if len(corr_lst) <= 0: return None corr_arr = np.array(corr_lst, dtype=object) # corr_df = pd.DataFrame(corr_arr, columns=[ "Attribute Indices", "Relevant Features", "GradPFS Score"]) corr_df = pd.DataFrame(corr_arr, columns=["Target Feature", "Relevant Features", "Irrelevant Features"]) """:type corr_df: pd.DataFrame""" return corr_df def generate_pdf_report(self, fs_type: str = 'U') -> bool: """ A method that executes GradPFS algorithm for either Univariate Feature Selection ('U') or Multivariate Feature Selection ('M') and generates a PDF report. :param fs_type: Feature selection type: 'U' -> univariate or 'M' -> multivariate. Default is 'U' :return: True if a PDF report is generated. """ # 2. Run a feature selection algorithm if fs_type == 'M': # 2a. Multivariate feature selection corr_df = self.multivariate_fs() fig_corr = None if corr_df is None: return False # Create table data tab_data = np.vstack([corr_df.columns, corr_df.to_numpy()]) col_width = [1/3, 1/3, 1/3] else: # 2b. Univariate feature selection corr_mat_df = self.univariate_fs() lst_redundant = GradPFS.find_redundant_features(corr_mat_df.to_numpy(), self.thd_score) # Create a plot figure fig_corr = plt.Figure(figsize=(8.5, 8), dpi=300) ax_corr = fig_corr.add_subplot(1, 1, 1) sns.heatmap(corr_mat_df, annot=True, cmap="coolwarm", annot_kws={"size": 7}, ax=ax_corr) ax_corr.set_title("Univariate Feature Correlation Matrix") fig_corr.tight_layout(pad=3) # Add padding to ensure the plot doesn't occupy the whole page # Create table data tab_data = [["Redundant Features", "GradPFS Score"]] for x in lst_redundant: feat = x[0] scores = np.round(x[1], 3) # Handle both single numbers and arrays/lists if isinstance(scores, np.ndarray): score_val = tuple(scores.tolist()) else: # If it's just a single float/int score_val = scores tab_data.append([str(feat), str(score_val)]) tab_data = np.array(tab_data, dtype=object) col_width = [1/2, 1/2] # 3. Produce PDF report if isinstance(self.data_src, str): f_name = ntpath.basename(self.data_src) f_name = f_name.replace('.csv', '') else: f_name = "" if fs_type == 'M': out_info = [["Feature Selection Type", "Multivariate"]] pdf_file = f"{f_name}_multi_report.pdf" else: out_info = [["Feature Selection Type", "Univariate"]] pdf_file = f"{f_name}_uni_report.pdf" out_info.append(["Minimum Correlation Score", f"{self.thd_score}"]) out_info = np.array(out_info, dtype=object) out_file = [["Encoding", "Feature Name"]] for txt in self.titles: col = int(txt[0]) if (self.target_col is not None) and (col == self.target_col): out_file.append([f"{txt[0]}", f"{txt[1].decode()}** (target feature)"]) else: out_file.append([f"{txt[0]}", f"{txt[1].decode()}"]) # out_file.append(["File", f"{f_path}"]) out_file = np.array(out_file, dtype=object) with (PdfPages(pdf_file)) as pdf: pdf.savefig(GradPFS.generate_table("Gradual Pattern-based Feature Selection (GradPFS) Report", out_info, [2/3,1/3], xscale=0.5)) if fig_corr is not None: pdf.savefig(fig_corr) pdf.savefig(GradPFS.generate_table("", out_file, [1/4, 3/4])) pdf.savefig(GradPFS.generate_table("", tab_data, col_width)) return True @staticmethod def find_redundant_features(corr_arr: np.ndarray, thd_score: float) -> list: """ A method that identifies features that are redundant using their correlation score. :param corr_arr: A correlation matrix as a numpy array. :param thd_score: A user-specified minimum correlation score for filtering redundant features. :return: Redundant features with the corresponding similarity/correlation score. """ lst_redundant = [] """:type lst_redundant: list""" lst_info = [] """:type lst_info: list""" for i in range(corr_arr.shape[0]): # row index lst_sim = [] cor_scores = [] for j in range(i, corr_arr.shape[1]): # col index cor_score = corr_arr[i, j] if abs(cor_score) > thd_score: lst_sim.append((-j if cor_score < 0 else j)) cor_scores.append(round(float(abs(cor_score)), 3)) if len(lst_sim) <= 1: continue is_subset = False for item in lst_redundant: is_subset = set(lst_sim).issubset(item) if is_subset: break if not is_subset: lst_redundant.append(set(lst_sim)) lst_info.append([set(lst_sim), cor_scores]) return lst_info @staticmethod def find_similar(corr_set: dict, cor_arr: np.ndarray): """ A method that searches a correlation matrix for a specific set of features. :param corr_set: A set of features. :param cor_arr: A correlation matrix as a numpy array. :return: Found a set of features and correlation score. """ row_idx = list(corr_set)[0] lst_sim = [] cor_scores = [] """:type lst_sim: list""" for j in list(corr_set): cor_score = cor_arr[row_idx, j] cor_scores.append(round(float(cor_score), 3)) lst_sim.append(j) sim_set = set(lst_sim) """:type sim_set: set""" return [sim_set, cor_scores] @staticmethod def generate_table(title: str, data: np.ndarray, col_width: list, xscale: float = 1, yscale: float = 1.5): """ A method that represents data in a table format using the matplotlib library. :param title: The title of the table. :param data: The data to be displayed. :param col_width: The width size of each column. :param xscale: The width of the table. :param yscale: The length of the table. :return: A matplotlib table. """ #fig_tab = plt.Figure(figsize=(8.5, 11), dpi=300) # ax_tab = fig_tab.add_subplot(1, 1, 1) fig_tab, ax_tab = plt.subplots(figsize=(8.5, 11), dpi=300) ax_tab.set_axis_off() ax_tab.set_title(f"{title}") tab = ax_tab.table(cellText=data[:, :], loc='upper center', colWidths=col_width, cellLoc='left', ax=ax_tab) tab.scale(xscale, yscale) fig_tab.tight_layout() return fig_tab