Source code for so4gp.algorithms.tgrad_ami

# SPDX-License-Identifier: GNU GPL v3
# This file is licensed under the terms of the GNU GPL v3.0.
# See the LICENSE file at the root of this
# repository for complete details.

import time
import numpy as np
from sklearn.feature_selection import mutual_info_regression
from .tgrad import TGrad


[docs] class TGradAMI(TGrad):
[docs] def __init__(self, *args, min_error: float = 0.0001, **kwargs): """ Algorithm for estimating time-lag using Average Mutual Information (AMI) and KMeans clustering which is extended to mining gradual patterns. The average mutual information I(X; Y) is a measure of the “information” amount that the random variables X and Y provide about one another. This algorithm extends the work published in: https://ieeexplore.ieee.org/abstract/document/8858883. TGradAMI is an algorithm that improves the classical TGrad algorithm for extracting more accurate temporal gradual patterns. It computes Mutual Information (MI) with respect to target-column with original dataset to get the actual relationship between variables: by computing MI for every possible time-delay and if the transformed dataset has the same almost identical MI to the original dataset, then it selects that as the best time-delay. Instead of min-representativity value, the algorithm relies on the error-margin between MIs. :param args: [required] data source path of Pandas DataFrame, [optional] minimum-support, [optional] eq :param kwargs: [required] target-column or attribute or feature, [optional] minimum representativity :param min_error: [optional] minimum Mutual Information error margin. >>> from so4gp.algorithms import TGradAMI >>> import pandas >>> >>> dummy_data = [["2021-03", 30, 3, 1, 10], ["2021-04", 35, 2, 2, 8], ["2021-05", 40, 4, 2, 7], ["2021-06", 50, 1, 1, 6], ["2021-07", 52, 7, 1, 2]] >>> dummy_df = pandas.DataFrame(dummy_data, columns=['Date', 'Age', 'Salary', 'Cars', 'Expenses']) >>> >>> mine_obj = TGradAMI(dummy_df, min_sup=0.5, target_col=1, min_rep=0.5, min_error=0.1) >>> result_dict = mine_obj.discover_tgp(use_clustering=True, eval_mode=False) >>> >>> # print(result['Patterns']) >>> print(result_dict) """ super(TGradAMI, self).__init__(*args, **kwargs) self._error_margin: float = min_error self._feature_cols: np.ndarray = np.setdiff1d(self.attr_cols, self.target_col) self._mi_error: float = 0
@property def error_margin(self): return self._error_margin @property def mi_error(self): return self._mi_error @property def feature_cols(self): return self._feature_cols def find_best_mutual_info(self): """ A method that computes the mutual information I(X; Y) of the original dataset and all the transformed datasets w.r.t. Minimum representativity threshold. We improve the computation of MI: if the MI of a dataset is 0 (0 indicates NO MI), we replace it with -1 (this encoding allows our algorithm to treat that MI as useless). So now, that allows our algorithm to easily distinguish very small MI values. This is beautiful because if the initial MI is 0, then both will be -1, making it the optimal MI with any other -1 in the time-delayed MIs. :return: {column index: transformation step} """ # 1. Compute MI for original dataset w.r.t. target-col y = np.array(self.full_attr_data[self.target_col], dtype=float).T x_data = np.array(self.full_attr_data[self._feature_cols], dtype=float).T init_mi_info = np.array(mutual_info_regression(x_data, y), dtype=float) # 2. Compute all the MI for every time-delay and compute error mi_list = [] for step in range(1, self.max_step): # Compute MI attr_data, _ = self.transform_and_mine(step, return_patterns=False) y = np.array(attr_data[self.target_col], dtype=float).T x_data = np.array(attr_data[self._feature_cols], dtype=float).T try: mi_vals = np.array(mutual_info_regression(x_data, y), dtype=float) except ValueError: optimal_dict = {int(self._feature_cols[i]): step for i in range(len(self._feature_cols))} self._mi_error = -1 self.min_rep = round(((self.row_count - step) / self.row_count), 5) return optimal_dict, step # Compute MI error squared_diff = np.square(np.subtract(mi_vals, init_mi_info)) mse_arr = np.sqrt(squared_diff) is_mi_preserved = np.all(mse_arr <= self._error_margin) if is_mi_preserved: optimal_dict = {int(self._feature_cols[i]): step for i in range(len(self._feature_cols))} self._mi_error = round(np.min(mse_arr), 5) self.min_rep = round(((self.row_count - step) / self.row_count), 5) return optimal_dict, step mi_list.append(mi_vals) mi_info_arr = np.array(mi_list, dtype=float) # 3. Standardize MI array mi_info_arr[mi_info_arr == 0] = -1 # 4. Identify steps (for every feature w.r.t. target) with minimum error from initial MI squared_diff = np.square(np.subtract(mi_info_arr, init_mi_info)) mse_arr = np.sqrt(squared_diff) # mse_arr[mse_arr < self.error_margin] = -1 optimal_steps_arr = np.argmin(mse_arr, axis=0) max_step = int(np.max(optimal_steps_arr)) + 1 # 5. Integrate feature indices with the computed steps optimal_dict = {int(self._feature_cols[i]): int(optimal_steps_arr[i] + 1) for i in range(len(self._feature_cols))} self._mi_error = round(np.min(mse_arr), 5) self.min_rep = round(((self.row_count - max_step) / self.row_count), 5) return optimal_dict, max_step def gather_delayed_data(self, optimal_dict: dict, max_step: int): """ A method that combined attribute data with different data transformations and computes the corresponding time-delay values for each attribute. :param optimal_dict: Raw transformed dataset. :param max_step: Largest data transformation step. :return: Combined transformed dataset with corresponding time-delay values. """ delayed_data: np.ndarray|None = None time_data = [] n = self.row_count k = (n - max_step) # Number of rows created by the largest step-delay for col_index in range(self.col_count): if (col_index == self.target_col) or (col_index in self.time_cols): # date-time column OR target column temp_row = self.full_attr_data[col_index][0: k] else: # other attributes step = optimal_dict[col_index] temp_row = self.full_attr_data[col_index][step: n] _, time_diffs = self.get_time_diffs(step) # Get first k items for delayed data temp_row = temp_row[0: k] # Get first k items for time-lag data temp_diffs = [(time_diffs[i]) for i in range(k)] time_data.append(temp_diffs) # for i in range(k): # if i in time_dict: # time_dict[i].append(time_diffs[i]) # else: # time_dict[i] = [time_diffs[i]] # print(f"{time_diffs}\n") # WHAT ABOUT TIME DIFFERENCE/DELAY? It is different for every step!!! delayed_data = temp_row if (delayed_data is None) \ else np.vstack((delayed_data, temp_row)) time_data = np.array(time_data) return delayed_data, time_data def discover_tgp(self, use_clustering: bool = False, transformation_steps: dict = None, eval_mode: bool = False): """ A method that applies mutual information concept, clustering, and hill-climbing algorithm to find the best data transformation that maintains MI and estimate the best time-delay value of the mined Fuzzy Temporal Gradual Patterns (FTGPs). :param use_clustering: Use a clustering algorithm to estimate the best time-delay value. :param transformation_steps: Data transformation steps (used to override the computed transformation steps). :param eval_mode: Run algorithm in evaluation mode. :return: List of (FTGPs as DICT object) or (FTGPs and evaluation data as a Python dict) when executed in evaluation mode. """ start = time.time() self.clear_gradual_patterns() # 1. Compute and find the lowest mutual information if transformation_steps is not None: optimal_dict = transformation_steps max_step = 0 for _, v in optimal_dict.items(): if v > max_step: max_step = v else: optimal_dict, max_step = self.find_best_mutual_info() # 2. Create a final (and dynamic) delayed dataset delayed_data, time_data = self.gather_delayed_data(optimal_dict, max_step) # 3. Discover temporal-GPs from time-delayed data lst_tgp = self._mine_gps_at_step(time_delay_data=time_data, attr_data=delayed_data, clustering_method=use_clustering) # 4. Organize FTGPs into a single list if lst_tgp: for tgp in lst_tgp: self.add_gradual_pattern(tgp) # 5. Check if the algorithm is in evaluation mode if eval_mode: title_row = [] time_title = [] for col, txt in enumerate(self.titles): title_row.append(txt) if (col != self.target_col) and (col not in self.time_cols): time_title.append(txt) add_dict = { 'Patterns': self.display_patterns, 'Transformation Steps': optimal_dict, 'Time Data': np.vstack((np.array(time_title), time_data.T)), 'Transformed Data': np.vstack((np.array(title_row), delayed_data.T if delayed_data is not None else np.array([]))), } else: add_dict = {"Patterns": self.display_patterns} duration = time.time() - start out_dict: dict[str, str | list | np.ndarray | None | dict] = { "Algorithm": "TGradAMI", # "Memory Usage (MiB)": f{mem_use)}", "Minimum Representation": f"{self.min_rep:.2f}", "MI Minimum Error": f"{self.error_margin:.2f}", "MI Error": f"{self.mi_error:.2f}", "Target Column": f"{self._target_col}", "Run-time": f"{duration:.6f} seconds"} self.generate_output_files(out_dict, target_col=self.target_col) out_dict.update(add_dict) return out_dict