# -*- coding: utf-8 -*-
# SPDX-License-Identifier: GNU GPL v3
# This file is licensed under the terms of the GNU GPL v3.0.
# See the LICENSE file at the root of this
# repository for complete details.
"""
@author: Dickson Owuor
@credits: Thomas Runkler, Edmond Menya, and Anne Laurent
@license: GNU GPL v3
@email: owuordickson@gmail.com
@created: 21 July 2021
@modified: 27 October 2022
A collection of classes for pre-processing data for mining gradual patterns.
"""
import gc
import csv
import time
import statistics
import numpy as np
import pandas as pd
from tabulate import tabulate
from dateutil.parser import parse
from .utils import write_file
from .gradual_patterns import GP, TGP, PairwiseMatrix
[docs]
class DataGP:
[docs]
def __init__(self, data_source, min_sup=0.5, eq=False) -> None:
"""
A class for creating data-gp objects. A data-gp object is meant to store all the parameters required by GP
algorithms to extract gradual patterns (GP). It takes a numeric file (in CSV format) as input and converts it
into an object whose attributes are used by algorithms to extract GPs.
:param data_source: [required] a data source, it can either be a 'file in csv format' or a 'Pandas DataFrame'
:type data_source: pd.DataFrame | str
:param min_sup: [optional] minimum support threshold, the default is 0.5
:type min_sup: float
:param eq: [optional] encode equal values as gradual, the default is False
:type eq: bool
"""
self._data_src = data_source
self._thd_supp: float = min_sup
self._include_equal_values: bool = eq
self._titles, self._data = DataGP.read(data_source)
""":type _titles: list"""
""":type _data: np.ndarray"""
self._row_count: int = 0
self._col_count: int = 0
self._time_cols: np.ndarray = np.array([])
self._attr_cols: np.ndarray = np.array([])
self._valid_bins: dict | None = None
self._warping_set: dict | None = None
self._attr_size: int = 0
self._gradual_patterns = None
""":type _gradual_patterns: list[GP] | None"""
self._init_attributes()
@property
def thd_supp(self) -> float:
return self._thd_supp
@property
def titles(self) -> list:
return self._titles
@property
def data(self) -> np.ndarray:
return self._data
@property
def row_count(self) -> int:
return self._row_count
@property
def col_count(self) -> int:
return self._col_count
@property
def time_cols(self) -> np.ndarray:
return self._time_cols
@property
def attr_cols(self) -> np.ndarray:
return self._attr_cols
@property
def valid_bins(self) -> dict | None:
return self._valid_bins
@property
def warping_set(self) -> dict[str, list] | None:
return self._warping_set
@property
def attr_size(self) -> int:
return self._attr_size
@property
def gradual_patterns(self) -> list | None:
return self._gradual_patterns
@property
def display_patterns(self) -> list:
str_gps = []
if self._gradual_patterns is None:
return str_gps
for gp in self._gradual_patterns:
str_gp, gp_params = gp.print(self.titles)
str_gps.append([str_gp, *gp_params])
return str_gps
@property
def display_patterns_as_df(self) -> pd.DataFrame:
if not self._gradual_patterns:
return pd.DataFrame(columns=['Pattern'])
all_rows = []
for gp in self._gradual_patterns:
str_gp, gp_params = gp.print(self.titles, descriptor_title=True)
# Create a clean dictionary for this row
row_data = {"Pattern": str_gp}
for param_dict in gp_params:
row_data.update(param_dict)
all_rows.append(row_data)
return pd.DataFrame(all_rows)
def _init_attributes(self) -> None:
"""Initializes the attributes of the data-gp object."""
def get_attr_cols() -> np.ndarray:
"""
Returns indices of all columns with non-datetime objects
:return: ndarray
"""
all_cols = np.arange(self._col_count)
attr_cols = np.setdiff1d(all_cols, self._time_cols)
return attr_cols
def get_time_cols() -> np.ndarray:
"""
Tests each column's objects for date-time values. Returns indices of all columns with date-time objects
:return: A ndarray object containing the indices of the time columns.
"""
# Retrieve the first column only
time_cols = list()
n = self._col_count
for i in range(n): # check every column/attribute for time format
row_data = str(self._data[0][i])
try:
time_ok, t_stamp = DataGP.test_time(row_data)
if time_ok:
time_cols.append(i)
except ValueError:
continue
return np.array(time_cols)
self._row_count, self._col_count = self._data.shape
self._time_cols = get_time_cols()
self._attr_cols = get_attr_cols()
[docs]
def add_gradual_pattern(self, pattern) -> None:
"""
Adds a gradual pattern to the list of gradual patterns.
:param pattern: A gradual pattern
"""
if self._gradual_patterns is None:
self._gradual_patterns = list()
if not isinstance(pattern, (GP, TGP)):
raise Exception("Pattern must be of type GP, ExtGP, or TGP")
self._gradual_patterns.append(pattern)
[docs]
def clear_gradual_patterns(self) -> None:
"""Clears the list of gradual patterns."""
self._gradual_patterns = list()
[docs]
def remove_subsets(self, gi_arr:set, gradual_patterns: list[GP]|None=None) -> None:
"""
Remove subset GPs from the list.
:param gi_arr: Gradual items in an array
:param gradual_patterns: List of gradual patterns (if None, use the object's GPs)
:return: List of GPs
"""
gps = self._gradual_patterns if gradual_patterns is None else gradual_patterns
if gps is None:
return
for gp in gps:
result1 = set(gp.as_set).issubset(gi_arr)
result2 = set(gp.as_swapped_set).issubset(gi_arr)
if result1 or result2:
gps.remove(gp)
[docs]
def fit_bitmap(self, attr_data=None) -> None:
"""
Generates bitmaps for columns with numeric objects. It stores the bitmaps in attribute valid_bins (those bitmaps
whose computed support values are greater or equal to the minimum support threshold value).
:param attr_data: Stepped attribute objects
:type attr_data: np.ndarray | None
:return: void
"""
# (check) implement parallel multiprocessing
# 1. Transpose csv array data
if attr_data is None:
attr_data = self._data.T
self._attr_size = self._row_count
else:
self._attr_size = len(attr_data[self._attr_cols[0]])
# 2. Construct and store 1-item_set valid bins
# execute binary rank to calculate support of a pattern
n = self._attr_size
self._valid_bins = {}
for col in self._attr_cols:
# 2a. Generate 1-itemset gradual-items
col_data = np.array(attr_data[col], dtype=float)
with np.errstate(invalid='ignore'):
if not self._include_equal_values:
temp_pos = np.array(col_data > col_data[:, np.newaxis])
else:
temp_pos = np.array(col_data >= col_data[:, np.newaxis])
np.fill_diagonal(temp_pos, False)
# 2b. Check support of each generated item set
supp = float(np.sum(temp_pos)) / float(n * (n - 1.0) / 2.0)
if (supp >= self._thd_supp )and (self._valid_bins is not None):
self._valid_bins[f"{col}+"] = PairwiseMatrix(bin_mat=temp_pos, support=supp)
self._valid_bins[f"{col}-"] = PairwiseMatrix(bin_mat=temp_pos.T, support=supp)
# print(self._valid_bins)
valid_bins_len = len(self._valid_bins) if self._valid_bins is not None else 0
if valid_bins_len < 3:
self._valid_bins = None
gc.collect()
[docs]
def fit_warpingset(self) -> None:
"""
Generates transaction ids (tids) for each column/feature with numeric objects. It stores the tids in attribute
valid_tids (those tids whose computed support values are greater or equal to the minimum support threshold
value).
The method decomposes the pairwise matrix of a gradual item/pattern into a warping set. Attributes that have
strong correlation will produce a warping set with dense zigzag patterns when plotted as a graph. Those with weak
correlation will produce a warping set with sparse zigzag patterns.
"""
if self._valid_bins is None:
self.fit_bitmap()
if self._valid_bins is None:
return
n = self._row_count
self._warping_set = {}
for gi_str, gi_data in self._valid_bins.items():
lst_ij: list = list(DataGP.gen_gradual_warping_set(gi_data.bin_mat))
# set_ij = set(sorted(list(lst_ij), key=lambda x: x[0])) ## Messes with the order of the items in the set
tids_len = len(lst_ij)
supp = float((tids_len*0.5) * (tids_len - 1)) / float(n * (n - 1.0) / 2.0)
if (supp >= self._thd_supp) and self._warping_set is not None:
self._warping_set[gi_str] = lst_ij
[docs]
def generate_output_files(self, alg_data: dict, target_col: int = None, save_to_file: bool = True):
"""
Generates output of results (as files) for the GP mining algorithm.
:param alg_data: Dictionary of algorithm parameters.
:param target_col: Index of the target column.
:param save_to_file: If True, saves the output to files.
"""
list_gp = self.gradual_patterns
num_patterns = len(list_gp) if list_gp is not None else 0
f_name = str(str(alg_data['Algorithm']) + '_' + str(time.time()).replace('.', '', 1))
out_txt = ""
for key, val in alg_data.items():
out_txt += f"{key}: {val}\n"
out_txt += f"No. of (dataset) attributes: {self.col_count}\n"
out_txt += f"No. of (dataset) objects: {self.row_count} \n"
out_txt += f"Minimum support: {self.thd_supp}\n"
# out_txt += f"Number of cores: {num_cores}\n"
out_txt += f"Number of patterns: {num_patterns}\n"
out_txt += f"\nAttributes:\n"
tgt_col = target_col if target_col is not None else -1
for i, txt in enumerate(self.titles):
if i == tgt_col:
out_txt += f"{i}. {txt}**\n"
else:
out_txt += f"{i}. {txt}\n"
out_txt += f"\nFile: {self._data_src if isinstance(self._data_src, str) else 'a dataframe'}\n"
out_txt += str("\nPattern : Support" + '\n')
list_tgp = self.gradual_patterns
if list_tgp is not None:
for tgp in list_tgp:
gp_str = f"{tgp.to_string()} : {tgp.support}"
if len(gp_str) > 100:
gp_str = gp_str[:100] + '\n' + gp_str[100:]
out_txt += f"{gp_str}\n"
if not save_to_file:
print(out_txt)
if save_to_file:
gp_df = self.display_patterns_as_df
gp_df.to_csv(str(f_name+'.csv'), index=False)
write_file(out_txt, str(f_name+'.txt'), wr=True)
[docs]
@classmethod
def analyze_gps(cls, data_src: pd.DataFrame | str, min_sup: float, est_gps: list[GP], approach: str = 'bfs') -> str:
"""
For each estimated GP, computes its true support using the GRAANK approach and returns the statistics (% error,
and standard deviation).
>>> import so4gp as sgp
>>> import pandas
>>> dummy_data = [[30, 3, 1, 10], [35, 2, 2, 8], [40, 4, 2, 7], [50, 1, 1, 6], [52, 7, 1, 2]]
>>> columns = ['Age', 'Salary', 'Cars', 'Expenses']
>>> dummy_df = pandas.DataFrame(dummy_data, columns=['Age', 'Salary', 'Cars', 'Expenses'])
>>>
>>> estimated_gps = list()
>>> temp_gp = sgp.GP()
>>> for gi_str in ['0+', '1-']:
>>> temp_gp.add_gradual_item(sgp.GI.from_string(gi_str))
>>> temp_gp.support = 0.5
>>> estimated_gps.append(temp_gp)
>>> temp_gp = sgp.GP()
>>> for gi_str in ['1+', '3-', '0+']:
>>> temp_gp.add_gradual_item(sgp.GI.from_string(gi_str))
>>> temp_gp.support = 0.48
>>> estimated_gps.append(temp_gp)
>>> res = sgp.analyze_gps(dummy_df, min_sup=0.4, est_gps=estimated_gps, approach='bfs')
>>> print(res)
Gradual Pattern Estimated Support True Support Percentage Error Standard Deviation
['0+', '1-'] 0.5 0.4 25.0% 0.071
['1+', '3-', '0+'] 0.48 0.6 -20.0% 0.085
:param data_src: Data set file
:param min_sup: Minimum support (set by user)
:param est_gps: Estimated GPs
:param approach: 'Bfs' (default) or 'dfs'
:return: Tabulated results
"""
if approach == 'dfs':
d_set = cls(data_src, min_sup)
d_set.fit_warpingset()
else:
d_set = cls(data_src, min_sup)
d_set.fit_bitmap()
headers = ["Gradual Pattern", "Estimated Support", "True Support", "Percentage Error", "Standard Deviation"]
data = []
for est_gp in est_gps:
est_sup = est_gp.support
est_gp.support = 0
if approach == 'dfs':
true_gp = est_gp.validate_tree(d_set)
else:
true_gp = est_gp.validate_graank(d_set)
true_sup = true_gp.support
if true_sup == 0:
percentage_error = np.inf
st_dev = np.inf
else:
percentage_error = ((est_sup - true_sup) / true_sup) * 100
st_dev = statistics.stdev([est_sup, true_sup])
if len(true_gp.gradual_items) == len(est_gp.gradual_items):
data.append(
[est_gp.to_string(), round(float(est_sup), 3), round(float(true_sup), 3), str(round(float(percentage_error), 3)) + '%',
round(float(st_dev), 3)])
else:
data.append([est_gp.to_string(), round(est_sup, 3), -1, np.inf, np.inf])
return tabulate(data, headers=headers)
[docs]
@staticmethod
def gen_gradual_warping_set(pairwise_mat: np.ndarray, as_array: bool = False) -> list[tuple[int, int]] | np.ndarray:
"""
A method that decomposes the pairwise matrix of a gradual item/pattern into a warping set. Attributes that have
strong correlation will produce a warping set with dense zigzag patterns when plotted as a graph. Those with weak
correlation will produce a warping set with sparse zigzag patterns.
:param pairwise_mat: The pairwise matrix of a gradual item/pattern.
:param as_array: If True, returns the warping path as a numpy array else as a list of tuples.
:return: A list array of the warping path (as an edge list).
"""
edge_lst: list[tuple[int, int]] = [(i, j) for i, row in enumerate(pairwise_mat) for j, val in enumerate(row) if val]
edge_lst = sorted(list(edge_lst), key=lambda x: x[0])
if as_array:
return np.array(edge_lst)
return edge_lst
[docs]
@staticmethod
def read(data_src) -> tuple[list, np.ndarray]:
"""
Reads all the contents of a file (in CSV format) or a data-frame. Checks if its columns have numeric values. It
separates its column headers (titles) from the objects.
:param data_src: A data source, it can either be a 'file in csv format' or a 'Pandas DataFrame'
:type data_src: pd.DataFrame | str
:return: The title, column objects
"""
# 1. Retrieve data set from source
if isinstance(data_src, pd.DataFrame):
# a. DataFrame source
# Check column names
try:
# Check data type
_ = data_src.columns.astype(float)
# Add column values
data_src.loc[-1] = data_src.columns.to_numpy(dtype=float) # adding a row
data_src.index = data_src.index + 1 # shifting index
data_src.sort_index(inplace=True)
# Rename column names
header_vals = ['col_' + str(k) for k in np.arange(data_src.shape[1])]
data_src.columns = header_vals
except ValueError:
pass
except TypeError:
pass
# print ("Data fetched from DataFrame")
return DataGP.clean_data(data_src)
else:
# b. CSV file
file = str(data_src)
try:
with open(file, 'r') as f:
dialect = csv.Sniffer().sniff(f.readline(), delimiters=";,' '\t")
f.seek(0)
reader = csv.reader(f, dialect)
raw_data = list(reader)
f.close()
if len(raw_data) <= 1:
raise Exception("CSV file read error. File has little or no data")
else:
# print ("Data fetched from CSV file")
# 2. Get table headers
keys = np.arange(len(raw_data[0]))
if raw_data[0][0].replace('.', '', 1).isdigit() or raw_data[0][0].isdigit():
header_vals = ['col_' + str(k) for k in keys]
else:
if raw_data[0][1].replace('.', '', 1).isdigit() or raw_data[0][1].isdigit():
header_vals = ['col_' + str(k) for k in keys]
else:
header_vals = raw_data[0]
del raw_data[0]
d_frame = pd.DataFrame(raw_data, columns=header_vals)
return DataGP.clean_data(d_frame)
except Exception as error:
raise Exception("Error: " + str(error))
[docs]
@staticmethod
def test_time(date_str) -> None | tuple[bool, float] | tuple[bool, bool]:
"""
Tests if a str represents a date-time variable.
:param date_str: A string
:type date_str: str
:return: bool (True if it is a date-time variable, False otherwise)
"""
# add all the possible formats
try:
if type(int(date_str)):
return False, False
except ValueError:
try:
if type(float(date_str)):
return False, False
except ValueError:
try:
date_time = parse(date_str)
t_stamp = time.mktime(date_time.timetuple())
return True, t_stamp
except ValueError:
raise ValueError('no valid date-time format found')
[docs]
@staticmethod
def clean_data(df) -> tuple[list, np.ndarray]:
"""
Cleans a data-frame (i.e., missing values, outliers) before extraction of GPs
:param df: data-frame
:type df: pd.DataFrame
:return: list (column titles), numpy (cleaned data)
"""
# 1. Remove objects with Null values
df = df.dropna()
# 2. Remove columns with Strings
cols_to_remove = []
for col in df.columns:
try:
_ = df[col].astype(float)
except ValueError:
# Keep time columns
try:
ok, stamp = DataGP.test_time(str(df[col][0]))
if not ok:
cols_to_remove.append(col)
except ValueError:
cols_to_remove.append(col)
pass
except TypeError:
cols_to_remove.append(col)
pass
# keep only the columns in df that do not contain string
df = df[[col for col in df.columns if col not in cols_to_remove]]
# 3. Return titles and data
if df.empty:
raise Exception("Data set is empty after cleaning.")
return list(df.columns), df.values