# -*- coding: utf-8 -*-
"""
author: zengbin93
email: zeng_bin8888@163.com
create_dt: 2022/10/27 23:23
describe: 用于信号计算函数的各种辅助工具函数
"""
import numpy as np
from collections import Counter, OrderedDict
from typing import List, Any, Dict, Union, Tuple
from czsc.enum import Direction
from czsc.objects import BI, RawBar, ZS, Signal
[docs]def create_single_signal(**kwargs) -> OrderedDict:
"""创建单个信号"""
s = OrderedDict()
k1, k2, k3 = kwargs.get('k1', '任意'), kwargs.get('k2', '任意'), kwargs.get('k3', '任意')
v1, v2, v3 = kwargs.get('v1', '任意'), kwargs.get('v2', '任意'), kwargs.get('v3', '任意')
v = Signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2, v3=v3, score=kwargs.get('score', 0))
s[v.key] = v.value
return s
[docs]def is_symmetry_zs(bis: List[BI], th: float = 0.3) -> bool:
"""对称中枢判断:中枢中所有笔的力度序列,标准差小于均值的一定比例
https://pic2.zhimg.com/80/v2-2f55ef49eda01972462531ebb6de4f19_1440w.jpg
:param bis: 构成中枢的笔序列
:param th: 标准差小于均值的比例阈值
:return:
"""
if len(bis) % 2 == 0:
return False
zs = ZS(bis=bis)
if zs.zd > zs.zg or max([x.low for x in bis]) > min([x.high for x in bis]):
return False
zns = [x.power_price for x in bis]
if np.std(zns) / np.mean(zns) <= th:
return True
else:
return False
def check_cross_info(fast: [List, np.array], slow: [List, np.array]):
"""计算 fast 和 slow 的交叉信息
:param fast: 快线
:param slow: 慢线
:return:
"""
assert len(fast) == len(slow), "快线和慢线的长度不一样"
if isinstance(fast, list):
fast = np.array(fast)
if isinstance(slow, list):
slow = np.array(slow)
length = len(fast)
delta = fast - slow
cross_info = []
last_i = -1
last_v = 0
temp_fast = []
temp_slow = []
for i, v in enumerate(delta):
last_i += 1
last_v += abs(v)
temp_fast.append(fast[i])
temp_slow.append(slow[i])
if i >= 2 and delta[i - 1] <= 0 < delta[i]:
kind = "金叉"
elif i >= 2 and delta[i - 1] >= 0 > delta[i]:
kind = "死叉"
else:
continue
cross_info.append({'位置': i, "类型": kind, "快线": fast[i], "慢线": slow[i],
"距离": last_i, '距今': length - i,
"面积": round(last_v, 4), '价差': round(v, 4),
"快线高点": max(temp_fast), "快线低点": min(temp_fast),
"慢线高点": max(temp_slow), "慢线低点": min(temp_slow),
})
last_i = 0
last_v = 0
temp_fast = []
temp_slow = []
return cross_info
[docs]def check_pressure_support(bars: List[RawBar], q_seq: List[float] = None) -> Dict:
"""检查 bars 中的支撑、压力信息
1. 通过 round 函数对 K 线价格序列进行近似,统计价格出现次数,取出现次数超过5次的价位
2. 在出现次数最多的价格序列上计算分位数序列作为关键价格序列
:param bars: K线序列,按时间升序
:param q_seq: 分位数序列
:return:
"""
assert len(bars) >= 300, "分析至少需要300根K线"
min_low = min(x.low for x in bars)
price_seq = [y for x in bars for y in (x.open, x.close, x.high, x.low)]
price_seq = [round(x, 0) if min_low > 100 else round(x, 1) for x in price_seq]
lines = sorted([x for x, v in Counter(price_seq).most_common() if v >= 5])
q_seq = q_seq if q_seq else [0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1]
key_price = [np.quantile(lines, i, method='nearest') for i in q_seq]
kp_low = [x for x in key_price if x <= bars[-1].close]
kp_high = [x for x in key_price if x >= bars[-1].close]
info = {
"关键位": key_price,
"支撑位": kp_low,
"压力位": kp_high,
"第一支撑": kp_low[-1] if len(kp_low) >= 1 else -1,
"第二支撑": kp_low[-2] if len(kp_low) >= 2 else -1,
"第一压力": kp_high[0] if len(kp_high) >= 1 else -1,
"第二压力": kp_high[1] if len(kp_high) >= 2 else -1,
}
return info
[docs]def check_gap_info(bars: List[RawBar]):
"""检查 bars 中的缺口信息
:param bars: K线序列,按时间升序
:return:
"""
gap_info = []
if len(bars) < 2:
return gap_info
for i in range(1, len(bars)):
bar1, bar2 = bars[i-1], bars[i]
right = bars[i:]
gap = None
if bar1.high < bar2.low:
delta = round(bar2.low / bar1.high - 1, 4)
cover = "已补" if min(x.low for x in right) < bar1.high else "未补"
gap = {"kind": "向上缺口", 'cover': cover, 'sdt': bar1.dt, 'edt': bar2.dt,
'high': bar2.low, 'low': bar1.high, 'delta': delta}
if bar1.low > bar2.high:
delta = round(bar1.low / bar2.high - 1, 4)
cover = "已补" if max(x.high for x in right) > bar1.low else "未补"
gap = {"kind": "向下缺口", 'cover': cover, 'sdt': bar1.dt, 'edt': bar2.dt,
'high': bar1.low, 'low': bar2.high, 'delta': delta}
if gap:
gap_info.append(gap)
return gap_info
[docs]def fast_slow_cross(fast, slow):
"""计算 fast 和 slow 的交叉信息
:param fast: 快线
:param slow: 慢线
:return:
"""
assert len(fast) == len(slow), "快线和慢线的长度不一样"
if isinstance(fast, list):
fast = np.array(fast)
if isinstance(slow, list):
slow = np.array(slow)
length = len(fast)
delta = fast - slow
cross_info = []
last_i = -1
last_v = 0
temp_fast = []
temp_slow = []
for i, v in enumerate(delta):
last_i += 1
last_v += abs(v)
temp_fast.append(fast[i])
temp_slow.append(slow[i])
if i >= 2 and delta[i - 1] <= 0 < delta[i]:
kind = "金叉"
elif i >= 2 and delta[i - 1] >= 0 > delta[i]:
kind = "死叉"
else:
continue
cross_info.append({'位置': i, "类型": kind, "快线": fast[i], "慢线": slow[i],
"距离": last_i, '距今': length - i,
"面积": round(last_v, 4), '价差': round(v, 4),
"快线高点": max(temp_fast), "快线低点": min(temp_fast),
"慢线高点": max(temp_slow), "慢线低点": min(temp_slow),
})
last_i = 0
last_v = 0
temp_fast = []
temp_slow = []
return cross_info
[docs]def same_dir_counts(seq: [List, np.array]):
"""计算 seq 中与最后一个数字同向的数字数量
:param seq: 数字序列
:return:
example
----------
>>>print(same_dir_counts([-1, -1, -2, -3, 0, 1, 2, 3, -1, -2, 1, 1, 2, 3]))
>>>print(same_dir_counts([-1, -1, -2, -3, 0, 1, 2, 3]))
"""
s = seq[-1]
c = 0
for num in seq[::-1]:
if (num > 0 and s > 0) or (num < 0 and s < 0):
c += 1
else:
break
return c
[docs]def count_last_same(seq: Union[List, np.array, Tuple]):
"""统计与seq列表最后一个元素相似的连续元素数量
:param seq: 数字序列
:return:
"""
s = seq[-1]
c = 0
for _s in seq[::-1]:
if _s == s:
c += 1
else:
break
return c
[docs]def get_sub_elements(elements: List[Any], di: int = 1, n: int = 10) -> List[Any]:
"""获取截止到倒数第 di 个元素的前 n 个元素
:param elements: 全部元素列表
:param di: 指定结束元素为倒数第 di 个
:param n: 指定需要的元素个数
:return: 部分元素列表
>>>x = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>>y1 = get_sub_elements(x, di=1, n=3)
>>>y2 = get_sub_elements(x, di=2, n=3)
"""
assert di >= 1
if di == 1:
se = elements[-n:]
else:
se = elements[-n - di + 1: -di + 1]
return se
[docs]def is_bis_down(bis: List[BI]):
"""判断 bis 中的连续笔是否是向下的"""
if not bis or len(bis) < 3 or len(bis) % 2 == 0:
return False
assert bis[1].fx_b.dt > bis[0].fx_b.dt, "时间由远到近"
if bis[-1].direction == Direction.Down \
and bis[0].high == max([x.high for x in bis]) \
and bis[-1].low == min([x.low for x in bis]):
return True
else:
return False
[docs]def is_bis_up(bis: List[BI]):
"""判断 bis 中的连续笔是否是向上的"""
if not bis or len(bis) < 3 and len(bis) % 2 == 0:
return False
assert bis[1].fx_b.dt > bis[0].fx_b.dt, "时间由远到近"
if bis[-1].direction == Direction.Up \
and bis[-1].high == max([x.high for x in bis]) \
and bis[0].low == min([x.low for x in bis]):
return True
else:
return False
def get_zs_seq(bis: List[BI]) -> List[ZS]:
"""获取连续笔中的中枢序列
:param bis: 连续笔对象列表
:return: 中枢序列
"""
zs_list = []
if not bis:
return []
for bi in bis:
if not zs_list:
zs_list.append(ZS(bis=[bi]))
continue
zs = zs_list[-1]
if not zs.bis:
zs.bis.append(bi)
zs_list[-1] = zs
else:
if (bi.direction == Direction.Up and bi.high < zs.zd) \
or (bi.direction == Direction.Down and bi.low > zs.zg):
zs_list.append(ZS(bis=[bi]))
else:
zs.bis.append(bi)
zs_list[-1] = zs
return zs_list
def cross_zero_axis(n1: Union[List, np.ndarray], n2: Union[List, np.ndarray]) -> int:
"""判断两个数列的零轴交叉点
:param n1: 数列1
:param n2: 数列2
:return: 交叉点所在的索引位置
"""
assert len(n1) == len(n2), '输入两个数列长度不等'
axis_0 = np.zeros(len(n1))
n1 = np.flip(n1)
n2 = np.flip(n2)
x1 = np.where(n1[0] * n1 < axis_0, True, False)
x2 = np.where(n2[0] * n2 < axis_0, True, False)
num1 = np.argmax(x1[:-1] != x1[1:]) + 2 if np.any(x1) else 0
num2 = np.argmax(x2[:-1] != x2[1:]) + 2 if np.any(x2) else 0
return max(num1, num2)
def cal_cross_num(cross: List, distance: int = 1) -> tuple:
"""使用 distance 过滤掉fast_slow_cross函数返回值cross列表中
不符合要求的交叉点,返回处理后的金叉和死叉数值
:param cross: fast_slow_cross函数返回值
:param distance: 金叉和死叉之间的最小距离
:return: jc金叉值 ,SC死叉值
"""
if len(cross) == 0:
return 0, 0
elif len(cross) == 1:
cross_ = cross
elif len(cross) == 2:
if cross[-1]['距离'] < distance:
cross_ = []
else:
cross_ = cross
else:
if cross[-1]['距离'] < distance:
last_cross = cross[-1]
del cross[-2]
re_cross = [i for i in cross if i['距离'] >= distance]
re_cross.append(last_cross)
else:
re_cross = [i for i in cross if i['距离'] >= distance]
cross_ = []
for i in range(0, len(re_cross)):
if len(cross_) >= 1 and re_cross[i]['类型'] == re_cross[i - 1]['类型']:
# 不将上一个元素加入cross_
del cross_[-1]
cross_.append(re_cross[i])
else:
cross_.append(re_cross[i])
jc = len([x for x in cross_ if x['类型'] == '金叉'])
sc = len([x for x in cross_ if x['类型'] == '死叉'])
return jc, sc
def down_cross_count(x1: Union[List, np.array], x2: Union[List, np.array]) -> int:
"""输入两个序列,计算 x1 下穿 x2 的次数
:param x1: list
:param x2: list
:return: int
"""
x = np.array(x1) < np.array(x2)
num = 0
for i in range(len(x) - 1):
b1, b2 = x[i], x[i + 1]
if b2 and b1 != b2:
num += 1
return num