# -*- coding: utf-8 -*-
"""
author: zengbin93
email: zeng_bin8888@163.com
create_dt: 2022/1/29 15:01
describe: 相关系数计算、可视化
References:
1. https://zhuanlan.zhihu.com/p/362258222
2. https://blog.csdn.net/qq_45538220/article/details/107429201
"""
import numpy as np
import pandas as pd
from tqdm import tqdm
from typing import Union
[docs]def nmi_matrix(df: pd.DataFrame, heatmap=False) -> pd.DataFrame:
"""计算高维标准化互信息并以矩阵形式输出
:param df: 数据
:param heatmap: 是否绘制热力图
:return:
"""
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn import metrics
plt.rcParams["font.sans-serif"] = ["SimHei"] # 用来正常显示中文标签
plt.rcParams["axes.unicode_minus"] = False # 用来正常显示负号
cols = df.columns.to_list()
m_dict = {}
for i, col1 in tqdm(enumerate(cols), desc="nmi"):
X = df[col1]
for col2 in cols[i:]:
Y = df[col2]
nmi = metrics.normalized_mutual_info_score(X, Y)
m_dict[f"{col1}_{col2}"] = nmi
m_dict[f"{col2}_{col1}"] = nmi
m = []
for col1 in cols:
A = []
for col2 in cols:
A.append(m_dict[f"{col1}_{col2}"])
m.append(A)
dfm = pd.DataFrame(m, index=cols, columns=cols)
if heatmap:
print("NMI(标准化互信息) = \n", dfm)
plt.close()
figure, ax = plt.subplots(figsize=(len(cols), len(cols)))
sns.heatmap(dfm, square=True, annot=True, ax=ax)
plt.show()
return dfm
[docs]def single_linear(y: Union[np.array, list], x: Union[np.array, list] = None) -> dict:
"""单变量线性拟合
:param y: 目标序列
:param x: 单变量值
:return res: 拟合结果,样例如下
{'slope': 1.565, 'intercept': 67.9783, 'r2': 0.9967}
slope 标识斜率
intercept 截距
r2 拟合优度
"""
if not x:
x = list(range(len(y)))
x_squred_sum = sum([x1 * x1 for x1 in x])
xy_product_sum = sum([x[i] * y[i] for i in range(len(x))])
num = len(x)
x_sum = sum(x)
y_sum = sum(y)
delta = float(num * x_squred_sum - x_sum * x_sum)
if delta == 0:
return {"slope": 0, "intercept": 0, "r2": 0}
y_intercept = (1 / delta) * (x_squred_sum * y_sum - x_sum * xy_product_sum)
slope = (1 / delta) * (num * xy_product_sum - x_sum * y_sum)
y_mean = np.mean(y)
ss_tot = sum([(y1 - y_mean) * (y1 - y_mean) for y1 in y]) + 0.00001
ss_err = sum([(y[i] - slope * x[i] - y_intercept) * (y[i] - slope * x[i] - y_intercept) for i in range(len(x))])
rsq = 1 - ss_err / ss_tot
res = {"slope": round(slope, 4), "intercept": round(y_intercept, 4), "r2": round(rsq, 4)}
return res
[docs]def cross_sectional_ic(df, x_col="open", y_col="n1b", method="spearman", **kwargs):
"""分析 df 中 x_col 和 y_col 列的截面相关性(IC)
:param df:数据,DateFrame格式
:param x_col:X列
:param y_col:Y列,一般采用下期收益,也就是 n1b
:param method:{'pearson', 'kendall', 'spearman'} or callable
* pearson : standard correlation coefficient
* kendall : Kendall Tau correlation coefficient
* spearman : Spearman rank correlation
* callable: callable with input two 1d ndarrays and returning a float
:return:df,res: 前者是每日相关系数结果,后者是每日相关系数的统计结果
"""
dt_col = kwargs.pop("dt_col", "dt")
tqdm.pandas(desc="cross_section_ic")
s = df.groupby(dt_col).progress_apply(lambda row: row[x_col].corr(row[y_col], method=method))
df = pd.DataFrame(s, columns=["ic"]).reset_index(inplace=False)
res = {
"x_col": x_col,
"y_col": y_col,
"method": method,
"IC均值": 0,
"IC标准差": 0,
"ICIR": 0,
"IC胜率": 0,
"IC绝对值>2%占比": 0,
"累计IC回归R2": 0,
"累计IC回归斜率": 0,
"月胜率": 0,
"月均值": 0,
"年胜率": 0,
"年均值": 0,
}
if df.empty:
return df, res
df = df[~df["ic"].isnull()].copy()
ic_avg = df["ic"].mean()
ic_std = df["ic"].std()
res["IC均值"] = round(ic_avg, 4)
res["IC标准差"] = round(ic_std, 4)
res["ICIR"] = round(ic_avg / ic_std, 4) if ic_std != 0 else 0
if ic_avg > 0:
res["IC胜率"] = round(len(df[df["ic"] > 0]) / len(df), 4)
else:
res["IC胜率"] = round(len(df[df["ic"] < 0]) / len(df), 4)
res["IC绝对值>2%占比"] = round(len(df[df["ic"].abs() > 0.02]) / len(df), 4)
lr_ = single_linear(y=df["ic"].cumsum().to_list())
res.update({"累计IC回归R2": lr_["r2"], "累计IC回归斜率": lr_["slope"]})
monthly_ic = df.groupby(df["dt"].dt.strftime("%Y年%m月"))["ic"].mean().to_dict()
monthly_win_rate = len([1 for x in monthly_ic.values() if np.sign(x) == np.sign(res["IC均值"])]) / len(monthly_ic)
res["月胜率"] = round(monthly_win_rate, 4)
res["月均值"] = round(np.mean(list(monthly_ic.values())), 4)
yearly_ic = df.groupby(df["dt"].dt.strftime("%Y年"))["ic"].mean().to_dict()
yearly_win_rate = len([1 for x in yearly_ic.values() if np.sign(x) == np.sign(res["IC均值"])]) / len(yearly_ic)
res["年胜率"] = round(yearly_win_rate, 4)
res["年均值"] = round(np.mean(list(yearly_ic.values())), 4)
return df, res