创建存储前复权因子和行情的数据库和分区表:
if(not existsDatabase("dfs://stock_pre")){
create database "dfs://stock_pre"
partitioned by VALUE(2025.01M..2025.05M),
engine="OLAP"
}else{
print("The database has created.")
}
create table "dfs://stock_pre"."stock_pre"(
trade_date DATE[comment="交易日"],
ts_code SYMBOL[comment="股票代码"],
open DOUBLE[comment="开盘价"],
high DOUBLE[comment="最高价"],
low DOUBLE[comment="最低价"],
close DOUBLE[comment="收盘价"],
pre_close DOUBLE[comment="昨日收盘价"],
vol DOUBLE[comment="成交量"],
amount DOUBLE[comment="成交额"],
adj_pre DOUBLE[comment="前复权因子值"]
)
partitioned by trade_date
根据计算公式从未复权行情得到前复权因子和前复权行情,并且存入相应的库表:
def calOneCodeFunc(code){
/*@test
code = "600519.SH"
*/
//把1个票1天的未复权行情查到内存
one_code_data =
select *
from loadTable("dfs://stock_day_k", "stock_day_k")
where ts_code=code
//停牌交易日close价格为NULL,特殊处理当日的收盘价
update one_code_data
set fill_close=iif(close==NULL and pre_close==prev(close),
prev(close),
iif(close==NULL,pre_close,close))
//计算后复权因子
update one_code_data
set adj_back=iif(cumcount(fill_close)==1, 1, cumprod(prev(fill_close)\pre_close))
//计算前复权因子
update one_code_data
set adj_pre=adj_back/last(adj_back)
//包含复权因子和复权行情的表
result =
select
trade_date,
ts_code,
open*adj_pre as open,
high*adj_pre as high,
low*adj_pre as low,
close*adj_pre as close,
pre_close*adj_pre as pre_close,
vol,
amount,
adj_pre
from one_code_data
return result
}
//处理未复权行情表内全部股票
calCodes = exec distinct ts_code from loadTable("dfs://stock_day_k", "stock_day_k")
//按照股票代码并行计算
stock_pre = ploop(calOneCodeFunc, calCodes).unionAll()
//写入前复权分区表
loadTable("dfs://stock_pre", "stock_pre").append!(stock_pre)
部分重要除权日的数据查询:
select *
from loadTable("dfs://stock_pre", "stock_pre")
where ts_code="600519.SH", trade_date in [2002.07.25,2003.07.14,2004.07.01]
返回: