Source code for src.utils

# -*- coding: utf-8 -*-
""" 
Модуль містить допоміжні функції для підготовки даних та проведення розрахунків. 
"""
import numpy as np
import pandas as pd
from scipy import stats
from pathlib import Path
from functools import reduce


ROOT = Path(__file__).resolve().parent.parent
PATH_RAW = ROOT / "data" / "raw"
PATH_INTERIM = ROOT / "data" / "interim"
PATH_PROCESSED = ROOT / "data" / "processed"
POPULATION = pd.read_excel(PATH_RAW / "P99" / "population.xls", index_col=0)
REGIONS = {"region": POPULATION.index.tolist()}
REGIONS_MAP = POPULATION["region_id"].astype(float).to_dict()
POPULATION_MAP = POPULATION["population"].to_dict()


def _divide(a, b):
    """ Повертає 0 при діленні на 0 замість `np.inf`. """
    return np.divide(a, b, out=np.zeros_like(a), where=b != 0)


[docs]def merge_all(data, on, return_all_regions=True): """Послідовно виконує pd.merge на масиві таблиць. Parameters ---------- data: list[pd.DataFrame] Список таблиць, які необхідно об'єднати on: str Ключ для об'єднання таблиць return_all_regions : bool Чи включати відсутні області в таблицю Returns ------- `pd.DataFrame` Об'єднана єдина таблиця """ df = reduce(lambda l, r: pd.merge(l, r, on=on, how="outer"), data) if not return_all_regions: return df all_regions = df.append(pd.DataFrame(REGIONS)) return ( all_regions.assign(order=all_regions["region"].map(REGIONS_MAP)) .drop_duplicates("region") .sort_values("order") .drop("order", axis=1) .reset_index(drop=True) )
[docs]def weighted_average(df, columns, weights, multiplier=10): r"""Ф-ція середнього зваженого. .. math:: \bar{x} = \frac{\sum_{i=1}^{n} w_ix_i}{\sum_{i=1}^{n} w_i} \times m де :math:`x` є `значенням`, :math:`w` є його вагою значення, :math:`m` є мультиплікатором. Parameters ---------- df : pd.DataFrame Таблиця з параметрами columns : list[str] Перелік колонок, значення яких слід розрахувати weights: dict[str, float] Словник з назвою параметру та його вагою multiplier : float Мультиплікатор результату Examples -------- >>> df = pd.DataFrame({ "A": [1, 4, 4, 5, 6, 7, 5, 3, 4, 5], "B": [2, 8, 1, 4, 6, 2, 8, 1, 1, 1], }) Функція адаптова під роботу з таблицею (використовує `broadcasting`), що містить окремі колонки, які необхідно порахувати; у реальному застосуванні ці колонки визначаються патерном. >>> weighted_average(df=df, columns=["A", "B"], weights={"A": 1.5, "B": 2}) 0 15.714286 1 62.857143 2 22.857143 3 44.285714 4 60.000000 5 41.428571 6 67.142857 7 18.571429 8 22.857143 9 27.142857 dtype: float64 Returns ------- `pd.Series` Колонка з розрахованим середнім зваженим """ s = sum(df[col] * weights.get(col) for col in columns) / sum(weights.values()) return s * multiplier
[docs]def zscore_wrapper(df, param, cv=1.96): r"""Визначає аутлаєри колонки `param` за допомогою `scipy.stats.zscore <https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.zscore.html>`_. Відхилення (аутлаєр) є статистично значущим, якщо отримане значення є більшим або меншим за критичне значення `cv`. .. math:: z =\frac{x_i-\mu}{\sigma} Parameters ---------- df : pd.DataFrame Таблиця з параметрами param : str Назва параметру cv : float Критичне значення [1.96, 2.58] для 95% та 99% стат. значущості відповідно Examples -------- Певний параметр може містити аутлаєри, які часом можуть виникати внаслідок помилки в даних або свідчити про аномальну ситуацію в певних областях (обидва варіанти можуть викривлювати результати розрахунків). Для прикладу я генерую датасет `df`, що містить одну колонку `param`, котра складається з випадкового набору значень в межах нормального розподілу (normal distribution) >>> df = pd.DataFrame({"param": np.random.standard_normal(size=24)}) >>> df param 0 -1.292244 1 -2.610441 2 0.069343 3 -0.959068 4 0.407773 5 0.192192 6 0.694097 7 -0.758156 8 0.403748 9 1.585239 10 -0.807982 11 -0.507439 12 -0.312586 13 0.624023 14 -0.953230 15 -0.797517 16 1.604804 17 -1.187396 18 -0.535961 19 2.263956 20 -0.182657 21 0.616233 22 -0.928197 23 -0.320723 Функція по суті є обгорткою для ``scipy.stats.zscore`` і повертає ті рядки-аутлаєри (чиї значення zscore виходять за критичне значення ``cv``) >>> zscore_wrapper(df, "param") param 1 -2.610441 19 2.263956 Returns ------- `pd.DataFrame` Таблиця з статистично значущими аутлаєрами """ ss = stats.zscore(df[param], ddof=1, nan_policy="omit") return df.loc[abs(ss) > cv, :]
[docs]def normalize_parameter( array, fill_na=True, min_bound=None, max_bound=None, feature_range=(0, 1), reverse=False, ): r"""Імплементація min-max normalization формули: .. math:: {x}' = a + \frac{(x-min(x))(b-a)}{max(x)-min(x)} де :math:`x` є `array`, :math:`a` та :math:`b` є `feature_range`, що за замовченням є [0, 1]; Якщо параметри `min_bound` та `max_bound` задані, функція ігнорує реальні мінімальні та максимальні значення `array`. Parameters ---------- array : pd.Series Колонка, значення якої нормалізуємо fill_na : bool NaN policy: заповнюємо порожні значення нулями (`True`) або ні (`False`) min_bound: Any[None, int, float] Задана нижня межа параметрів, або задане мінімальне значення колонки. max_bound: Any[None, int, float] Задана верхня межа параметрів, або задане максимальне значення колонки. feature_range : tuple (min, max), default=(0, 1) Шкала, в межах якої трансформуємо дані: [мінімальне, максимальне] reverse : bool Спосіб нормалізації. Найбільше значення отримує 1 якщо `True`, 0 якщо `False`. Examples -------- >>> array = pd.Series([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) За замовченням функція трансформує `array` в шкалу [0, 1], використовуючи наявні максимальні та мінімальні значення колонки. >>> normalize_parameter(array) feature_range=(0, 1); fill_na=True; array_bounds=(1, 10); normalization_bounds=(1, 10) 0 0.000000 1 0.111111 2 0.222222 3 0.333333 4 0.444444 5 0.555556 6 0.666667 7 0.777778 8 0.888889 9 1.000000 dtype: float64 Можемо задати максимальне значення самостійно. Наприклад, якщо ми хочемо порівняти наявність генеральних планів у селах і за найкращий показник свідомо беремо 100%, але в жодній з областей такого показника немає, ми вручну встановлюємо верхню межу як 100% замість максимального значення по областях Спрощений приклад на `array`: якщо `max_bound=15`, а реальне максимальне значення в межах колонки сягає `10`, найкращий з усіх рядків не отримує `1` (тому що порівнюється вже не відносно інших значень, а ще й відносно заданого максимального значення). >>> normalize_parameter(array, max_bound=15) feature_range=(0, 1); fill_na=True; array_bounds=(1, 10); normalization_bounds=(1, 15) 0 0.000000 1 0.071429 2 0.142857 3 0.214286 4 0.285714 5 0.357143 6 0.428571 7 0.500000 8 0.571429 9 0.642857 dtype: float64 Приклад використання іншої від [0, 1] шкали нормалізації: >>> normalize_parameter(array, feature_range=(2, 5)) feature_range=(2, 5); fill_na=True; array_bounds=(1, 10); normalization_bounds=(1, 10) 0 2.000000 1 2.333333 2 2.666667 3 3.000000 4 3.333333 5 3.666667 6 4.000000 7 4.333333 8 4.666667 9 5.000000 dtype: float64 Returns ------- `pd.Series` Колонка з нормалізованими значеннями """ s = array.fillna(0) if fill_na else array array_min, array_max = array_bounds = s.min(), s.max() if min_bound is None: min_bound = array_min if max_bound is None: max_bound = array_max if reverse: min_bound, max_bound = max_bound, min_bound normalization_bounds = (min_bound, max_bound) feature_min, feature_max = feature_range print(f"{feature_range=}; {fill_na=}; {array_bounds=}; {normalization_bounds=}, {reverse=}") return feature_min + ((s - min_bound) * (feature_max - feature_min) / (max_bound - min_bound))
[docs]def save_data(sources, weights, parameter, show_results=False): """Розраховує оцінку галузевого параметру Parameters ---------- sources: list[pd.DataFrame] Список усіх таблиць з нормалізованими показниками нижнього рівня weights: dict[str, float] Словник з назвою параметру та його вагою parameter : str Галузевий параметр (параметр верхнього рівня): [P1, P2, P3, P4, P5, P6, P7, P8] """ _re_raw = "region|p\d{1}_\d{2}_raw$" _re_norm = "region|p\d{1}_\d{2}$" df_raw = merge_all( data=[df.loc[:, df.columns.str.contains(_re_raw)] for df in sources], on="region", ) df_raw.to_csv(PATH_PROCESSED / f"{parameter}_raw.csv", index=False) df = merge_all( data=[df.loc[:, df.columns.str.contains(_re_norm)] for df in sources], on="region", ) columns = df.loc[:, df.columns.str.contains("p")].columns df[parameter] = weighted_average(df, columns, weights) df.to_csv(PATH_PROCESSED / f"{parameter}.csv", index=False) if show_results: return df