Source code for czsc.analyze

# -*- coding: utf-8 -*-
"""
author: zengbin93
email: zeng_bin8888@163.com
create_dt: 2021/3/10 11:21
describe: 缠论分型、笔的识别
"""
import os
import webbrowser
import numpy as np
from loguru import logger
from typing import List, Callable
from collections import OrderedDict
from czsc.enum import Mark, Direction
from czsc.objects import BI, FX, RawBar, NewBar
from czsc.utils.echarts_plot import kline_pro
from czsc import envs

logger.disable('czsc.analyze')


[docs]def remove_include(k1: NewBar, k2: NewBar, k3: RawBar): """去除包含关系:输入三根k线,其中k1和k2为没有包含关系的K线,k3为原始K线""" if k1.high < k2.high: direction = Direction.Up elif k1.high > k2.high: direction = Direction.Down else: k4 = NewBar(symbol=k3.symbol, id=k3.id, freq=k3.freq, dt=k3.dt, open=k3.open, close=k3.close, high=k3.high, low=k3.low, vol=k3.vol, elements=[k3]) return False, k4 # 判断 k2 和 k3 之间是否存在包含关系,有则处理 if (k2.high <= k3.high and k2.low >= k3.low) or (k2.high >= k3.high and k2.low <= k3.low): if direction == Direction.Up: high = max(k2.high, k3.high) low = max(k2.low, k3.low) dt = k2.dt if k2.high > k3.high else k3.dt elif direction == Direction.Down: high = min(k2.high, k3.high) low = min(k2.low, k3.low) dt = k2.dt if k2.low < k3.low else k3.dt else: raise ValueError if k3.open > k3.close: open_ = high close = low else: open_ = low close = high vol = k2.vol + k3.vol # 这里有一个隐藏Bug,len(k2.elements) 在一些及其特殊的场景下会有超大的数量,具体问题还没找到; # 临时解决方案是直接限定len(k2.elements)<=100 elements = [x for x in k2.elements[:100] if x.dt != k3.dt] + [k3] k4 = NewBar(symbol=k3.symbol, id=k2.id, freq=k2.freq, dt=dt, open=open_, close=close, high=high, low=low, vol=vol, elements=elements) return True, k4 else: k4 = NewBar(symbol=k3.symbol, id=k3.id, freq=k3.freq, dt=k3.dt, open=k3.open, close=k3.close, high=k3.high, low=k3.low, vol=k3.vol, elements=[k3]) return False, k4
[docs]def check_fx(k1: NewBar, k2: NewBar, k3: NewBar): """查找分型""" fx = None if k1.high < k2.high > k3.high and k1.low < k2.low > k3.low: fx = FX(symbol=k1.symbol, dt=k2.dt, mark=Mark.G, high=k2.high, low=k2.low, fx=k2.high, elements=[k1, k2, k3]) if k1.low > k2.low < k3.low and k1.high > k2.high < k3.high: fx = FX(symbol=k1.symbol, dt=k2.dt, mark=Mark.D, high=k2.high, low=k2.low, fx=k2.low, elements=[k1, k2, k3]) return fx
[docs]def check_fxs(bars: List[NewBar]) -> List[FX]: """输入一串无包含关系K线,查找其中所有分型""" fxs = [] for i in range(1, len(bars)-1): fx: FX = check_fx(bars[i-1], bars[i], bars[i+1]) if isinstance(fx, FX): # 这里可能隐含Bug,默认情况下,fxs本身是顶底交替的,但是对于一些特殊情况下不是这样,这是不对的。 # 临时处理方案,强制要求fxs序列顶底交替 if len(fxs) >= 2 and fx.mark == fxs[-1].mark: if envs.get_verbose(): logger.info(f"\n\ncheck_fxs: 输入数据错误{'+' * 100}") logger.info(f"当前:{fx.mark}, 上个:{fxs[-1].mark}") for bar in fx.raw_bars: logger.info(f"{bar}\n") logger.info(f'last fx raw bars: \n') for bar in fxs[-1].raw_bars: logger.info(f"{bar}\n") else: fxs.append(fx) return fxs
[docs]def check_bi(bars: List[NewBar], benchmark: float = None): """输入一串无包含关系K线,查找其中的一笔 :param bars: 无包含关系K线列表 :param benchmark: 当下笔能量的比较基准 :return: """ min_bi_len = envs.get_min_bi_len() fxs = check_fxs(bars) if len(fxs) < 2: return None, bars fx_a = fxs[0] try: if fxs[0].mark == Mark.D: direction = Direction.Up fxs_b = [x for x in fxs if x.mark == Mark.G and x.dt > fx_a.dt and x.fx > fx_a.fx] if not fxs_b: return None, bars fx_b = fxs_b[0] for fx in fxs_b[1:]: if fx.high >= fx_b.high: fx_b = fx elif fxs[0].mark == Mark.G: direction = Direction.Down fxs_b = [x for x in fxs if x.mark == Mark.D and x.dt > fx_a.dt and x.fx < fx_a.fx] if not fxs_b: return None, bars fx_b = fxs_b[0] for fx in fxs_b[1:]: if fx.low <= fx_b.low: fx_b = fx else: raise ValueError except: logger.exception("笔识别错误") return None, bars bars_a = [x for x in bars if fx_a.elements[0].dt <= x.dt <= fx_b.elements[2].dt] bars_b = [x for x in bars if x.dt >= fx_b.elements[0].dt] # 判断fx_a和fx_b价格区间是否存在包含关系 ab_include = (fx_a.high > fx_b.high and fx_a.low < fx_b.low) \ or (fx_a.high < fx_b.high and fx_a.low > fx_b.low) # 判断当前笔的涨跌幅是否超过benchmark的一定比例 if benchmark and abs(fx_a.fx - fx_b.fx) > benchmark * envs.get_bi_change_th(): power_enough = True else: power_enough = False # 成笔的条件:1)顶底分型之间没有包含关系;2)笔长度大于等于min_bi_len 或 当前笔的涨跌幅已经够大 if (not ab_include) and (len(bars_a) >= min_bi_len or power_enough): fxs_ = [x for x in fxs if fx_a.elements[0].dt <= x.dt <= fx_b.elements[2].dt] bi = BI(symbol=fx_a.symbol, fx_a=fx_a, fx_b=fx_b, fxs=fxs_, direction=direction, bars=bars_a) low_ubi = min([x.low for x in bars_b]) high_ubi = max([x.high for x in bars_b]) if (bi.direction == Direction.Up and high_ubi > bi.high) \ or (bi.direction == Direction.Down and low_ubi < bi.low): return None, bars else: return bi, bars_b else: return None, bars
[docs]class CZSC: def __init__(self, bars: List[RawBar], get_signals: Callable = None, max_bi_num=envs.get_max_bi_num(), ): """ :param bars: K线数据 :param max_bi_num: 最大允许保留的笔数量 :param get_signals: 自定义的信号计算函数 """ self.verbose = envs.get_verbose() self.max_bi_num = max_bi_num self.bars_raw: List[RawBar] = [] # 原始K线序列 self.bars_ubi: List[NewBar] = [] # 未完成笔的无包含K线序列 self.bi_list: List[BI] = [] self.symbol = bars[0].symbol self.freq = bars[0].freq self.get_signals = get_signals self.signals = None for bar in bars: self.update(bar) def __repr__(self): return "<CZSC~{}~{}>".format(self.symbol, self.freq.value) def __update_bi(self): bars_ubi = self.bars_ubi if len(bars_ubi) < 3: return # 查找笔 if not self.bi_list: # 第一个笔的查找 fxs = check_fxs(bars_ubi) if not fxs: return fx_a = fxs[0] fxs_a = [x for x in fxs if x.mark == fx_a.mark] for fx in fxs_a: if (fx_a.mark == Mark.D and fx.low <= fx_a.low) \ or (fx_a.mark == Mark.G and fx.high >= fx_a.high): fx_a = fx bars_ubi = [x for x in bars_ubi if x.dt >= fx_a.elements[0].dt] bi, bars_ubi_ = check_bi(bars_ubi) if isinstance(bi, BI): self.bi_list.append(bi) self.bars_ubi = bars_ubi_ return last_bi = self.bi_list[-1] # 如果上一笔被破坏,将上一笔的bars与bars_ubi进行合并 if (last_bi.direction == Direction.Up and bars_ubi[-1].high > last_bi.high) \ or (last_bi.direction == Direction.Down and bars_ubi[-1].low < last_bi.low): bars_ubi_a = last_bi.bars[:-1] + [x for x in bars_ubi if x.dt >= last_bi.bars[-1].dt] self.bi_list.pop(-1) else: bars_ubi_a = bars_ubi if self.verbose and len(bars_ubi_a) > 100: logger.info(f"czsc_update_bi: {self.symbol} - {self.freq} - {bars_ubi_a[-1].dt} 未完成笔延伸数量: {len(bars_ubi_a)}") if envs.get_bi_change_th() > 0.5 and len(self.bi_list) >= 5: benchmark = min(last_bi.power_price, np.mean([x.power_price for x in self.bi_list[-5:]])) else: benchmark = None bi, bars_ubi_ = check_bi(bars_ubi_a, benchmark) self.bars_ubi = bars_ubi_ if isinstance(bi, BI): self.bi_list.append(bi)
[docs] def update(self, bar: RawBar): """更新分析结果 :param bar: 单根K线对象 """ # 更新K线序列 if not self.bars_raw or bar.dt != self.bars_raw[-1].dt: self.bars_raw.append(bar) last_bars = [bar] else: # 当前 bar 是上一根 bar 的时间延伸 self.bars_raw[-1] = bar if len(self.bars_ubi) >= 3: edt = self.bars_ubi[1].dt self.bars_ubi = [x for x in self.bars_ubi if x.dt <= edt] last_bars = [x for x in self.bars_raw[-50:] if x.dt > edt] else: last_bars = self.bars_ubi[-1].elements last_bars[-1] = bar self.bars_ubi.pop(-1) # 去除包含关系 bars_ubi = self.bars_ubi for bar in last_bars: if len(bars_ubi) < 2: bars_ubi.append(NewBar(symbol=bar.symbol, id=bar.id, freq=bar.freq, dt=bar.dt, open=bar.open, close=bar.close, high=bar.high, low=bar.low, vol=bar.vol, elements=[bar])) else: k1, k2 = bars_ubi[-2:] has_include, k3 = remove_include(k1, k2, bar) if has_include: bars_ubi[-1] = k3 else: bars_ubi.append(k3) self.bars_ubi = bars_ubi # 更新笔 self.__update_bi() self.bi_list = self.bi_list[-self.max_bi_num:] if self.bi_list: sdt = self.bi_list[0].fx_a.elements[0].dt s_index = 0 for i, bar in enumerate(self.bars_raw): if bar.dt >= sdt: s_index = i break self.bars_raw = self.bars_raw[s_index:] if self.get_signals: self.signals = self.get_signals(c=self) else: self.signals = OrderedDict()
[docs] def to_echarts(self, width: str = "1400px", height: str = '580px', bs=None): """绘制K线分析图 :param width: 宽 :param height: 高 :param bs: 交易标记,默认为空 :return: """ kline = [x.__dict__ for x in self.bars_raw] if len(self.bi_list) > 0: bi = [{'dt': x.fx_a.dt, "bi": x.fx_a.fx} for x in self.bi_list] + \ [{'dt': self.bi_list[-1].fx_b.dt, "bi": self.bi_list[-1].fx_b.fx}] fx = [{'dt': x.dt, "fx": x.fx} for x in self.fx_list] else: bi = None fx = None chart = kline_pro(kline, bi=bi, fx=fx, width=width, height=height, bs=bs, title="{}-{}".format(self.symbol, self.freq.value)) return chart
[docs] def open_in_browser(self, width: str = "1400px", height: str = '580px'): """直接在浏览器中打开分析结果 :param width: 图表宽度 :param height: 图表高度 :return: """ home_path = os.path.expanduser("~") file_html = os.path.join(home_path, "temp_czsc.html") chart = self.to_echarts(width, height) chart.render(file_html) webbrowser.open(file_html)
@property def last_bi_extend(self): """判断最后一笔是否在延伸中,True 表示延伸中""" if self.bi_list[-1].direction == Direction.Up \ and max([x.high for x in self.bars_ubi]) > self.bi_list[-1].high: return True if self.bi_list[-1].direction == Direction.Down \ and min([x.low for x in self.bars_ubi]) < self.bi_list[-1].low: return True return False @property def finished_bis(self) -> List[BI]: """返回当下基本确认完成的笔列表""" if not self.bi_list: return [] else: if self.last_bi_extend: return self.bi_list[:-1] return self.bi_list @property def ubi_fxs(self) -> List[FX]: """返回当下基本确认完成的笔列表""" if not self.bars_ubi: return [] else: return check_fxs(self.bars_ubi) @property def fx_list(self) -> List[FX]: """返回当下基本确认完成的笔列表""" fxs = [] for bi_ in self.bi_list: fxs.extend(bi_.fxs[1:]) ubi = self.ubi_fxs for x in ubi: if x.dt > fxs[-1].dt: fxs.append(x) return fxs