Source code for czsc.utils.corr

# -*- coding: utf-8 -*-
"""
author: zengbin93
email: zeng_bin8888@163.com
create_dt: 2022/1/29 15:01
describe: 相关系数计算、可视化

References:
1. https://zhuanlan.zhihu.com/p/362258222
2. https://blog.csdn.net/qq_45538220/article/details/107429201
"""
import numpy as np
import pandas as pd
from tqdm import tqdm
from typing import Union


[docs]def nmi_matrix(df: pd.DataFrame, heatmap=False) -> pd.DataFrame: """计算高维标准化互信息并以矩阵形式输出 :param df: 数据 :param heatmap: 是否绘制热力图 :return: """ import seaborn as sns import matplotlib.pyplot as plt from sklearn import metrics plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签 plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号 cols = df.columns.to_list() m_dict = {} for i, col1 in tqdm(enumerate(cols), desc='nmi'): X = df[col1] for col2 in cols[i:]: Y = df[col2] nmi = metrics.normalized_mutual_info_score(X, Y) m_dict[f"{col1}_{col2}"] = nmi m_dict[f"{col2}_{col1}"] = nmi m = [] for col1 in cols: A = [] for col2 in cols: A.append(m_dict[f"{col1}_{col2}"]) m.append(A) dfm = pd.DataFrame(m, index=cols, columns=cols) if heatmap: print('NMI(标准化互信息) = \n', dfm) plt.close() figure, ax = plt.subplots(figsize=(len(cols), len(cols))) sns.heatmap(dfm, square=True, annot=True, ax=ax) plt.show() return dfm
[docs]def single_linear(y: Union[np.array, list], x: Union[np.array, list] = None) -> dict: """单变量线性拟合 :param y: 目标序列 :param x: 单变量值 :return res: 拟合结果,样例如下 {'slope': 1.565, 'intercept': 67.9783, 'r2': 0.9967} slope 标识斜率 intercept 截距 r2 拟合优度 """ if not x: x = list(range(len(y))) x_squred_sum = sum([x1 * x1 for x1 in x]) xy_product_sum = sum([x[i] * y[i] for i in range(len(x))]) num = len(x) x_sum = sum(x) y_sum = sum(y) delta = float(num * x_squred_sum - x_sum * x_sum) if delta == 0: return {'slope': 0, 'intercept': 0, 'r2': 0} y_intercept = (1 / delta) * (x_squred_sum * y_sum - x_sum * xy_product_sum) slope = (1 / delta) * (num * xy_product_sum - x_sum * y_sum) y_mean = np.mean(y) ss_tot = sum([(y1 - y_mean) * (y1 - y_mean) for y1 in y]) + 0.00001 ss_err = sum([(y[i] - slope * x[i] - y_intercept) * (y[i] - slope * x[i] - y_intercept) for i in range(len(x))]) rsq = 1 - ss_err / ss_tot res = {'slope': round(slope, 4), 'intercept': round(y_intercept, 4), 'r2': round(rsq, 4)} return res
[docs]def cross_sectional_ic(df, x_col='open', y_col='n1b', method='spearman', **kwargs): """分析 df 中 x_col 和 y_col 列的截面相关性(IC) :param df:数据,DateFrame格式 :param x_col:X列 :param y_col:Y列,一般采用下期收益,也就是 n1b :param method:{'pearson', 'kendall', 'spearman'} or callable * pearson : standard correlation coefficient * kendall : Kendall Tau correlation coefficient * spearman : Spearman rank correlation * callable: callable with input two 1d ndarrays and returning a float :return:df,res: 前者是每日相关系数结果,后者是每日相关系数的统计结果 """ dt_col = kwargs.pop('dt_col', 'dt') tqdm.pandas(desc='cross_section_ic') s = df.groupby(dt_col).progress_apply(lambda row: row[x_col].corr(row[y_col], method=method)) df = pd.DataFrame(s, columns=['ic']).reset_index(inplace=False) res = { "x_col": x_col, "y_col": y_col, "method": method, "IC均值": 0, "IC标准差": 0, "ICIR": 0, "IC胜率": 0, "IC绝对值>2%占比": 0, } if df.empty: return df, res df = df[~df['ic'].isnull()].copy() ic_avg = df['ic'].mean() ic_std = df['ic'].std() res['IC均值'] = round(ic_avg, 4) res['IC标准差'] = round(ic_std, 4) res['ICIR'] = round(ic_avg / ic_std, 4) if ic_std != 0 else 0 res['IC胜率'] = round(len(df[df['ic'] > 0]) / len(df), 4) res['IC绝对值>2%占比'] = round(len(df[df['ic'].abs() > 0.02]) / len(df), 4) return df, res