# -*- coding: utf-8 -*-
"""
author: zengbin93
email: zeng_bin8888@163.com
create_dt: 2022/1/29 15:01
describe: 相关系数计算、可视化
References:
1. https://zhuanlan.zhihu.com/p/362258222
2. https://blog.csdn.net/qq_45538220/article/details/107429201
"""
import numpy as np
import pandas as pd
from tqdm import tqdm
from typing import Union
[docs]def nmi_matrix(df: pd.DataFrame, heatmap=False) -> pd.DataFrame:
"""计算高维标准化互信息并以矩阵形式输出
:param df: 数据
:param heatmap: 是否绘制热力图
:return:
"""
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn import metrics
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
cols = df.columns.to_list()
m_dict = {}
for i, col1 in tqdm(enumerate(cols), desc='nmi'):
X = df[col1]
for col2 in cols[i:]:
Y = df[col2]
nmi = metrics.normalized_mutual_info_score(X, Y)
m_dict[f"{col1}_{col2}"] = nmi
m_dict[f"{col2}_{col1}"] = nmi
m = []
for col1 in cols:
A = []
for col2 in cols:
A.append(m_dict[f"{col1}_{col2}"])
m.append(A)
dfm = pd.DataFrame(m, index=cols, columns=cols)
if heatmap:
print('NMI(标准化互信息) = \n', dfm)
plt.close()
figure, ax = plt.subplots(figsize=(len(cols), len(cols)))
sns.heatmap(dfm, square=True, annot=True, ax=ax)
plt.show()
return dfm
[docs]def single_linear(y: Union[np.array, list], x: Union[np.array, list] = None) -> dict:
"""单变量线性拟合
:param y: 目标序列
:param x: 单变量值
:return res: 拟合结果,样例如下
{'slope': 1.565, 'intercept': 67.9783, 'r2': 0.9967}
slope 标识斜率
intercept 截距
r2 拟合优度
"""
if not x:
x = list(range(len(y)))
x_squred_sum = sum([x1 * x1 for x1 in x])
xy_product_sum = sum([x[i] * y[i] for i in range(len(x))])
num = len(x)
x_sum = sum(x)
y_sum = sum(y)
delta = float(num * x_squred_sum - x_sum * x_sum)
if delta == 0:
return {'slope': 0, 'intercept': 0, 'r2': 0}
y_intercept = (1 / delta) * (x_squred_sum * y_sum - x_sum * xy_product_sum)
slope = (1 / delta) * (num * xy_product_sum - x_sum * y_sum)
y_mean = np.mean(y)
ss_tot = sum([(y1 - y_mean) * (y1 - y_mean) for y1 in y]) + 0.00001
ss_err = sum([(y[i] - slope * x[i] - y_intercept) * (y[i] - slope * x[i] - y_intercept) for i in range(len(x))])
rsq = 1 - ss_err / ss_tot
res = {'slope': round(slope, 4), 'intercept': round(y_intercept, 4), 'r2': round(rsq, 4)}
return res
[docs]def cross_sectional_ic(df, x_col='open', y_col='n1b', method='spearman', **kwargs):
"""分析 df 中 x_col 和 y_col 列的截面相关性(IC)
:param df:数据,DateFrame格式
:param x_col:X列
:param y_col:Y列,一般采用下期收益,也就是 n1b
:param method:{'pearson', 'kendall', 'spearman'} or callable
* pearson : standard correlation coefficient
* kendall : Kendall Tau correlation coefficient
* spearman : Spearman rank correlation
* callable: callable with input two 1d ndarrays and returning a float
:return:df,res: 前者是每日相关系数结果,后者是每日相关系数的统计结果
"""
dt_col = kwargs.pop('dt_col', 'dt')
tqdm.pandas(desc='cross_section_ic')
s = df.groupby(dt_col).progress_apply(lambda row: row[x_col].corr(row[y_col], method=method))
df = pd.DataFrame(s, columns=['ic']).reset_index(inplace=False)
res = {
"x_col": x_col,
"y_col": y_col,
"method": method,
"IC均值": 0,
"IC标准差": 0,
"ICIR": 0,
"IC胜率": 0,
"IC绝对值>2%占比": 0,
}
if df.empty:
return df, res
df = df[~df['ic'].isnull()].copy()
ic_avg = df['ic'].mean()
ic_std = df['ic'].std()
res['IC均值'] = round(ic_avg, 4)
res['IC标准差'] = round(ic_std, 4)
res['ICIR'] = round(ic_avg / ic_std, 4) if ic_std != 0 else 0
res['IC胜率'] = round(len(df[df['ic'] > 0]) / len(df), 4)
res['IC绝对值>2%占比'] = round(len(df[df['ic'].abs() > 0.02]) / len(df), 4)
return df, res