# -*- coding: utf-8 -*-
# SPDX-License-Identifier: GNU GPL v3
# This file is licensed under the terms of the GNU GPL v3.0.
# See the LICENSE file at the root of this
# repository for complete details.
import json
import time
import numpy as np
import skfuzzy as fuzzy
import multiprocessing as mp
from sklearn.cluster import KMeans
from sklearn.preprocessing import MinMaxScaler
from .graank import GRAANK
from ..data_gp import DataGP
from ..gradual_patterns import GI, TGP, TimeDelay
[docs]
class TGrad(GRAANK):
[docs]
def __init__(self, *args, target_col: int, min_rep: float = 0.5, **kwargs):
"""
TGrad is an algorithm used to extract temporal gradual patterns from numeric datasets. An algorithm for mining
temporal gradual patterns using fuzzy membership functions. It uses a technique
published in: https://ieeexplore.ieee.org/abstract/document/8858883.
:param args: [required] a data source path of Pandas DataFrame, [optional] minimum-support, [optional] eq
:param target_col: [required] Target column.
:param min_rep: [optional] minimum representativity value.
>>> import so4gp.algorithms import TGrad
>>> import pandas
>>>
>>> dummy_data = [["2021-03", 30, 3, 1, 10], ["2021-04", 35, 2, 2, 8], ["2021-05", 40, 4, 2, 7], ["2021-06", 50, 1, 1, 6], ["2021-07", 52, 7, 1, 2]]
>>> dummy_df = pandas.DataFrame(dummy_data, columns=['Date', 'Age', 'Salary', 'Cars', 'Expenses'])
>>>
>>> mine_obj = TGrad(dummy_df, min_sup=0.5, target_col=1, min_rep=0.5)
>>> result_json = mine_obj.discover_tgp(parallel=True)
>>> # print(result['Patterns'])
>>> print(result_json)
"""
super(TGrad, self).__init__(*args, **kwargs)
self._target_col: int = target_col
self._min_rep: float = min_rep
self._max_step: int = self.row_count - int(min_rep * self.row_count)
self._full_attr_data: np.ndarray = self.data.copy().T
if len(self.time_cols) > 0:
print("Dataset Ok")
self._time_ok: bool = True
else:
print("Dataset Error")
self._time_ok: bool = False
raise Exception('No date-time datasets found')
@property
def target_col(self):
return self._target_col
@property
def min_rep(self):
return self._min_rep
@property
def max_step(self):
return self._max_step
@property
def full_attr_data(self):
return self._full_attr_data
@min_rep.setter
def min_rep(self, value):
if 0 < value <= 1:
self._min_rep = value
def discover_tgp(self, parallel: bool = False, num_cores: int = 1):
"""
Applies fuzzy-logic, data transformation, and gradual pattern mining to mine for Fuzzy Temporal Gradual Patterns.
:param parallel: Allow multiprocessing.
:param num_cores: Number of CPU cores for the algorithm to use.
:return: List of FTGPs as JSON object
"""
start = time.time()
self.clear_gradual_patterns()
# 1. Mine FTGPs
if parallel:
# implement parallel multi-processing
with mp.Pool(num_cores) as pool:
steps = range(self._max_step)
pattern_data = pool.map(self._safe_transform_and_mine, steps)
else:
pattern_data: list = []
for step in range(self._max_step):
t_gps = self._safe_transform_and_mine(
step + 1) # because for-loop it is not inclusive from range: 0 - max_step
pattern_data.append(t_gps)
# 2. Organize FTGPs into a single list
for item in pattern_data:
if item is None:
continue
# Standardize 'item' into a list so we only need one loop
lst_pattern = item if isinstance(item, list) else [item]
for pat in lst_pattern:
if isinstance(pat, TGP):
self.add_gradual_pattern(pat)
duration = time.time() - start
out_dict: dict[str, str | list] = {
"Algorithm": "TGrad",
# "Memory Usage (MiB)": f{mem_use)}",
"Minimum Representation": f"{self.min_rep:.2f}",
"Target Column": f"{self._target_col}",
"Run-time": f"{duration:.6f} seconds"}
self.generate_output_files(out_dict, target_col=self.target_col)
out_dict.update({"Patterns": self.display_patterns})
out: object = json.dumps(out_dict, indent=4)
return out
def transform_and_mine(self, step: int, return_patterns: bool = True):
"""
A method that: (1) transforms data according to a step value and, (2) mines the transformed data for FTGPs.
:param step: Data transformation step.
:param return_patterns: Allow method to mine TGPs.
:return: List of TGPs
"""
# NB: Restructure dataset based on target/reference col
if self._time_ok:
# 1. Calculate the time difference using a step
ok, time_diffs = self.get_time_diffs(step)
if not ok:
msg = "Error: Time in row " + str(time_diffs[0]) \
+ " or row " + str(time_diffs[1]) + " is not valid."
raise Exception(msg)
else:
tgt_col = self._target_col
if tgt_col in self.time_cols:
msg = "Target column is a 'date-time' attribute"
raise Exception(msg)
elif (tgt_col < 0) or (tgt_col >= self.col_count):
msg = "Target column does not exist\nselect column between: " \
"0 and " + str(self.col_count - 1)
raise Exception(msg)
else:
# 2. Transform datasets
delayed_attr_data = None
n = self.row_count
for col_index in range(self.col_count):
# Transform the datasets using (row) n+step
if (col_index == tgt_col) or (col_index in self.time_cols):
# date-time column OR target column
temp_row = self._full_attr_data[col_index][0: (n - step)]
else:
# other attributes
temp_row = self._full_attr_data[col_index][step: n]
delayed_attr_data = temp_row if (delayed_attr_data is None) \
else np.vstack((delayed_attr_data, temp_row))
# print(f"Time Diffs: {time_diffs}\n")
# print(f"{self.full_attr_data}: {type(self.full_attr_data)}\n")
# print(f"{delayed_attr_data}: {type(delayed_attr_data)}\n")
if return_patterns:
# 2. Execute t-graank for each transformation
t_gps = self._mine_gps_at_step(time_delay_data=time_diffs, attr_data=delayed_attr_data)
if len(t_gps) > 0:
return t_gps
return False
else:
return delayed_attr_data, time_diffs
else:
msg = "Fatal Error: Time format in column could not be processed"
raise Exception(msg)
def _safe_transform_and_mine(self, step: int, return_patterns: bool = True):
"""Wrapper to catch exceptions during parallel mining."""
try:
return self.transform_and_mine(step, return_patterns=return_patterns)
except Exception as e:
print(f"Error at step {step}: {e}")
return None
def _mine_gps_at_step(self, time_delay_data: np.ndarray | dict, attr_data: np.ndarray = None,
clustering_method: bool = False) -> list[TGP] | tuple[list[TGP], dict]:
"""
Uses apriori algorithm to find GP candidates based on the target-attribute. The candidates are validated if
their computed support is greater than or equal to the minimum support threshold specified by the user.
:param time_delay_data: Time-delay values
:param attr_data: the transformed data.
:param clustering_method: Find and approximate the best time-delay value using KMeans and Hill-climbing approach.
:return: Temporal-GPs as a list.
"""
try:
# If min-rep is too low
self.fit_bitmap(attr_data)
except ZeroDivisionError:
return []
t_gps: list[TGP] = []
valid_bins_dict: dict = (self.valid_bins or {}).copy()
if clustering_method and isinstance(time_delay_data, np.ndarray):
# Build the main triangular MF using the clustering algorithm
a, b, c = TGrad.build_mf_w_clusters(time_delay_data)
tri_mf_data = np.array([a, b, c])
else:
tri_mf_data = None
invalid_count = 0
while len(valid_bins_dict) > 0:
valid_bins_dict, inv_count = self._gen_apriori_candidates(valid_bins_dict, target_col=self._target_col)
invalid_count += inv_count
for gp_set, gi_data in valid_bins_dict.items():
if type(self) is TGrad:
t_lag = self.get_fuzzy_time_lag(gi_data.bin_mat, time_delay_data, gi_arr=None, tri_mf_data=tri_mf_data)
else:
t_lag = self.get_fuzzy_time_lag(gi_data.bin_mat, time_delay_data, gi_arr=gp_set, tri_mf_data=tri_mf_data)
if t_lag.valid:
tgp: TGP = TGP()
for gi_str in gp_set:
gi: GI = GI.from_string(gi_str)
if gi.attribute_col == self._target_col:
tgp.target_gradual_item = gi
else:
tgp.add_temporal_gradual_item(gi, t_lag)
tgp.support = gi_data.support
warping_set_arr: np.ndarray = np.array(
DataGP.gen_gradual_warping_set(gi_data.bin_mat, as_array=True))
tgp.compute_descriptors(warping_set_arr, obj_count=self.row_count)
t_gps.append(tgp)
return t_gps
def get_time_diffs(self, step: int): # optimized
"""
A method that computes the difference between 2 timestamps separated by a specific transformation step.
:param step: Data transformation step.
:return: Dict of time delay values
"""
size = self.row_count
time_diffs = {} # {row: time-lag}
for i in range(size):
if i < (size - step):
stamp_1 = 0
stamp_2 = 0
for col in self.time_cols: # sum timestamps from all time-columns
temp_1 = str(self.data[i][int(col)])
temp_2 = str(self.data[i + step][int(col)])
temp_stamp_1 = TGrad.get_timestamp(temp_1)
temp_stamp_2 = TGrad.get_timestamp(temp_2)
if (not temp_stamp_1) or (not temp_stamp_2):
# Unable to read time
return False, [i + 1, i + step + 1]
else:
stamp_1 += temp_stamp_1
stamp_2 += temp_stamp_2
time_diff = (stamp_2 - stamp_1)
# if time_diff < 0:
# Error time CANNOT go backwards,
# print(f"Problem {i} and {i + step} - {self.time_cols}")
# return False, [i + 1, i + step + 1]
time_diffs[int(i)] = float(abs(time_diff))
return True, time_diffs
def get_fuzzy_time_lag(self, bin_data: np.ndarray, time_data: np.ndarray | dict, gi_arr: set = None,
tri_mf_data: np.ndarray | None = None) -> TimeDelay:
"""
A method that uses a fuzzy membership function to select the most accurate time-delay value. We implement two
methods: (1) uses classical slide and re-calculate dynamic programming to find the best time-delay value and,
(2) uses metaheuristic hill-climbing to find the best time-delay value.
:param bin_data: Gradual item pairwise matrix.
:param time_data: Time-delay values.
:param gi_arr: Gradual item object.
:param tri_mf_data: The 'a,b,c' values of the triangular MF. Used to find and approximate the best time-delay value
using KMeans and Hill-climbing approach.
:return: TimeDelay object.
"""
if time_data is None:
return TimeDelay()
time_data_as_arr: np.ndarray | None = time_data if isinstance(time_data, np.ndarray) else None
time_data_as_dict: dict | None = time_data if isinstance(time_data, dict) else None
def approx_time_slide_calculate(time_lag_arr: np.ndarray) -> TimeDelay:
"""
A method that selects the most appropriate time-delay value from a list of possible values.
:param time_lag_arr: An array of all the possible time-delay values.
:return: The approximated TimeDelay object.
"""
if len(time_lag_arr) <= 0:
# if time_lags is blank, return nothing
return TimeDelay()
else:
time_lag_arr = np.absolute(np.array(time_lag_arr))
min_a = np.min(time_lag_arr)
max_c = np.max(time_lag_arr)
count = time_lag_arr.size + 3
tot_boundaries = np.linspace(min_a / 2, max_c + 1, num=count)
highest_sup = 0
center = time_lag_arr[0]
size = len(tot_boundaries)
for i in range(0, size, 2):
if (i + 3) <= size:
boundaries = tot_boundaries[i:i + 3:1]
else:
boundaries = tot_boundaries[size - 3:size:1]
memberships = fuzzy.membership.trimf(time_lag_arr, boundaries)
# Compute Support
sup_count = np.count_nonzero(memberships > 0)
total = memberships.size
curr_sup = sup_count / total
# curr_sup = calculate_support(memberships)
if curr_sup > highest_sup:
highest_sup = curr_sup
center = boundaries[1]
if curr_sup >= 0.5:
# print(boundaries[1])
return TimeDelay(int(boundaries[1]), curr_sup)
return TimeDelay(center, highest_sup)
def approx_time_hill_climbing(x_train: np.ndarray, initial_bias: float = 0, step_size: float = 0.9,
max_iterations: int = 10):
"""
A method that uses Hill-climbing algorithm to approximate the best time-delay value given a fuzzy triangular
membership function.
:param x_train: Initial time-delay values as an array.
:param initial_bias: (hyperparameter) initial bias value for the hill-climbing algorithm.
:param step_size: (hyperparameter) step size for the hill-climbing algorithm.
:param max_iterations: (hyperparameter) maximum number of iterations for the hill-climbing algorithm.
:return: Best position to move the triangular MF with its mean-squared-error.
"""
def hill_climbing_cost_function(min_membership: float = 0):
"""
Computes the logistic regression cost function for a fuzzy set created from a
triangular membership function.
:param min_membership: The minimum accepted value to allow membership in a fuzzy set.
:return: Cost function values.
"""
# 1. Generate fuzzy data set using MF from x_data
memberships = np.where(y_train <= b,
(y_train - a) / (b - a),
(c - y_train) / (c - b))
# 2. Generate y_train based on the given criteria (x>minimum_membership)
y_hat = np.where(memberships >= min_membership, 1, 0)
# 3. Compute loss_val
hat_count = np.count_nonzero(y_hat)
true_count = len(y_hat)
loss_val: float = (((true_count - hat_count) / true_count) ** 2) ** 0.5
# loss_val = abs(true_count - hat_count)
return loss_val
# 1. Normalize x_train
x_train = np.array(x_train, dtype=float)
# 2. Perform hill climbing to find the optimal bias
bias = initial_bias
y_train = x_train + bias
best_mse = hill_climbing_cost_function()
for iteration in range(max_iterations):
# a. Generate a new candidate bias by perturbing the current bias
new_bias = bias + step_size * np.random.randn()
# b. Compute the predictions and the MSE with the new bias
y_train = x_train + new_bias
new_mse = hill_climbing_cost_function()
# c. If the new MSE is lower, update the bias
if new_mse < best_mse:
bias = new_bias
best_mse = new_mse
# Make predictions using the optimal bias
return bias, best_mse
# 1. Get Indices
indices = np.argwhere(bin_data == 1)
# 2. Get TimeDelay Array
selected_rows = np.unique(indices.flatten())
if gi_arr is not None:
selected_cols = []
for gi_str in gi_arr:
# Ignore target-col and remove time-cols and target-col from the count
col = GI.from_string(gi_str).attribute_col
if (col != self._target_col) and (col < self._target_col):
selected_cols.append(col - (len(self.time_cols)))
elif (col != self._target_col) and (col > self._target_col):
selected_cols.append(col - (len(self.time_cols) + 1))
selected_cols = np.array(selected_cols, dtype=int)
t_lag_arr = time_data_as_arr[
np.ix_(selected_cols, selected_rows)] if time_data_as_arr is not None else np.array([])
else:
time_lags = []
for row, stamp_diff in (time_data_as_dict or {}).items(): # {row: time-lag-stamp}
if int(row) in selected_rows:
time_lags.append(stamp_diff)
t_lag_arr = np.array(time_lags)
best_time_lag = approx_time_slide_calculate(t_lag_arr)
return best_time_lag
# 3. Approximate TimeDelay value
best_time_lag: TimeDelay = TimeDelay(-1, 0)
if tri_mf_data is not None:
# 3b. Learn the best MF through KMeans and Hill-Climbing
a, b, c = tri_mf_data
best_time_lag = TimeDelay(-1, -1)
fuzzy_set = []
for t_lags in t_lag_arr:
init_bias = abs(b - np.median(t_lags))
slide_val, loss = approx_time_hill_climbing(t_lags, initial_bias=init_bias)
tstamp = int(b - slide_val)
sup = float(1 - loss)
fuzzy_set.append([tstamp, float(loss)])
if sup >= best_time_lag.support and abs(tstamp) > abs(best_time_lag.timestamp):
best_time_lag = TimeDelay(tstamp, sup)
# print(f"New Membership Fxn: {a - slide_val}, {b - slide_val}, {c - slide_val}")
else:
# 3a. Learn the best MF through slide-descent/sliding
for t_lags in t_lag_arr:
time_lag = approx_time_slide_calculate(t_lags)
if time_lag.support >= best_time_lag.support:
best_time_lag = time_lag
return best_time_lag
@staticmethod
def get_timestamp(time_str: str):
"""
A method that computes the corresponding timestamp from a DateTime string.
:param time_str: DateTime value as a string
:return: timestamp value
"""
try:
ok, stamp = DataGP.test_time(time_str)
if ok:
return stamp
else:
return False
except ValueError:
return False
@staticmethod
def build_mf_w_clusters(time_data: np.ndarray | None):
"""
A method that builds the boundaries of a fuzzy Triangular membership function (MF) using Singular Value
Decomposition (to estimate the number of centers) and KMeans algorithm to group time data according to the
identified centers. We then use the largest cluster to build the MF.
:param time_data: Time-delay values as an array.
:return: The boundary values of the triangular membership function.
"""
if time_data is None:
return 0, 0, 0
try:
# 1. Reshape into 1-column dataset
time_data = time_data.reshape(-1, 1)
# 2. Standardize data
scaler = MinMaxScaler()
data_scaled = scaler.fit_transform(time_data)
# 3. Apply SVD
u, s, vt = np.linalg.svd(data_scaled, full_matrices=False)
# 4. Plot singular values to help determine the number of clusters
# Based on the plot, choose the number of clusters (e.g., 3 clusters)
num_clusters = int(s[0])
# 5. Perform k-means clustering
kmeans = KMeans(n_clusters=num_clusters)
kmeans.fit(data_scaled)
# 6. Get cluster centers
centers = kmeans.cluster_centers_.flatten()
# 7. Define membership functions to ensure membership > 0.5
largest_mf = [0, 0, 0]
for center in centers:
half_width = 0.5 / 2 # since the membership value should be > 0.5
a = center - half_width
b = center
c = center + half_width
if abs(c - a) > abs(largest_mf[2] - largest_mf[0]):
largest_mf = [a, b, c]
# 8. Reverse the scaling
a = scaler.inverse_transform([[largest_mf[0]]])[0, 0]
b = scaler.inverse_transform([[largest_mf[1]]])[0, 0]
c = scaler.inverse_transform([[largest_mf[2]]])[0, 0]
# 9. Shift to remove negative MF (we do not want negative timestamps)
if a < 0:
shift_by = abs(a)
a = a + shift_by
b = b + shift_by
c = c + shift_by
return a, b, c
except Exception as e:
print(e)
return 0, 0, 0