快速开始
交易任务
从零开始构建一个交易任务
1. 根据目录结构创建文件
源码目录结构:
kfx-task-condition-demo/ # 交易任务文件名(英文名称,随意)
├── src/
│ └── python
| └── ConditionOrder # 交易任务名称(英文名称,随意)
| └── __init__.py # python交易任务策略代码
├── README.md # 交易任务说明
└── package.json # 编译配置信息
交易任务实现代码 __init__.py
1import kungfu
2from kungfu.wingchun.constants import *
3import json
4import time
5import math
6import threading
7from datetime import datetime
8from pykungfu import wingchun as wc
9
10yjj = kungfu.__binding__.yijinjing
11
12
13class Config(object):
14 def __init__(self, param):
15 sourceAccountList = param["accountId"].split("_")
16 self.marketSource = param["marketSource"]
17 exchangeTicker = param["ticker"].split("_")
18 self.side = Side(param["side"])
19 self.offset = Offset(param["offset"])
20 self.priceType = PriceType(param["priceType"])
21 self.volume = int(param["volume"])
22 self.maxLot = int(param.get("maxLot", 0))
23 self.startTime = str_to_nanotime(param.get("startTime", "0"))
24 self.orderPrice = param["orderPrice"]
25 self.source = ""
26 if len(sourceAccountList) == 2 and len(exchangeTicker) == 5:
27 self.source = sourceAccountList[0]
28 self.account = sourceAccountList[1]
29 self.exchange = exchangeTicker[0]
30 self.ticker = exchangeTicker[1]
31 self.priceCondition = param["priceCondition"]
32
33
34class PriceCondition(object):
35 def __init__(self, param):
36 self.currentPrice = int(param["currentPrice"])
37 self.compare = int(param["compare"])
38 self.triggerPrice = float(param["triggerPrice"])
39
40
41def update_strategy_state(state, value, context):
42 strategy_state = lf.types.StrategyStateUpdate()
43
44 if state == lf.enums.StrategyState.Normal:
45 strategy_state.value = str(value)
46 context.log.info(str(value))
47 elif state == lf.enums.StrategyState.Warn:
48 strategy_state.value = str(value)
49 context.log.warn(str(value))
50 else:
51 strategy_state.value = str(value)
52 context.log.error(str(value))
53
54 strategy_state.state = state
55
56 context.update_strategy_state(strategy_state)
57
58
59def pre_start(context):
60 context.MIN_VOL = 0
61 context.time_trigger = False
62 context.price = -1.0
63 context.order_placed = False
64 context.log.info("参数 {}".format(context.arguments))
65 args_dict = json.loads(context.arguments)
66
67 context.config = Config(args_dict)
68 context.trigger_info = ""
69 if context.config.startTime > 0:
70 date_time_for_nano = datetime.fromtimestamp(
71 context.config.startTime / (10**9)
72 )
73 time_str = date_time_for_nano.strftime("%Y-%m-%d %H:%M:%S.%f")
74 context.trigger_info = "时间满足" + time_str
75 if (not context.config.priceCondition) and context.config.startTime == 0:
76 update_strategy_state(
77 lf.enums.StrategyState.Error,
78 "触发时间和触发价格没设置.",
79 context,
80 )
81 context.log.info("触发时间和触发价格都没设置")
82 context.req_deregister()
83 return
84 if context.config.source:
85 context.add_account(context.config.source, context.config.account)
86 context.subscribe(
87 context.config.marketSource,
88 [context.config.ticker],
89 context.config.exchange,
90 )
91
92 update_strategy_state(
93 lf.enums.StrategyState.Normal,
94 "正常",
95 context,
96 )
97
98 ins_type = wc.utils.get_instrument_type(
99 context.config.exchange, context.config.ticker
100 )
101 context.log.info("(标的类型) {}".format(ins_type))
102 if context.MIN_VOL == 0:
103 context.MIN_VOL = type_to_minvol(ins_type)
104
105
106def str_to_nanotime(tm):
107 if tm is None or tm == "" or tm == "Invalid Date":
108 return 0
109 if tm.isdigit(): # in milliseconds
110 return int(tm) * 10**6
111 else:
112 year_month_day = time.strftime("%Y-%m-%d", time.localtime())
113 ymdhms = year_month_day + " " + tm.split(" ")[1]
114 timeArray = time.strptime(ymdhms, "%Y-%m-%d %H:%M:%S")
115 nano = int(time.mktime(timeArray) * 10**9)
116 return nano
117
118
119def type_to_minvol(argument):
120 switcher = {
121 InstrumentType.Stock: int(100),
122 InstrumentType.Future: int(1),
123 InstrumentType.Bond: int(1),
124 InstrumentType.StockOption: int(1),
125 InstrumentType.Fund: int(1),
126 InstrumentType.TechStock: int(200),
127 InstrumentType.Index: int(1),
128 }
129 return switcher.get(argument, int(1))
130
131
132def place_order(context):
133 if not context.order_placed:
134 if context.price < 0:
135 update_strategy_state(
136 lf.enums.StrategyState.Warn,
137 "没有收到行情",
138 context,
139 )
140 context.log.error("没有收到行情, 无法下单, 请检查行情连接")
141 context.req_deregister()
142 return
143
144 rest_volume = context.config.volume
145 if context.config.maxLot == 0 or context.config.maxLot >= context.config.volume:
146 order_volume = rest_volume
147 else:
148 order_volume = context.config.maxLot
149 order_volume = int(
150 math.ceil(float(order_volume) / context.MIN_VOL) * context.MIN_VOL
151 )
152 i_order = 0
153 vol_list = dict()
154 now_nano = time.time_ns()
155 while rest_volume > 0:
156 i_order += 1
157 volume = (
158 order_volume
159 if order_volume <= rest_volume
160 else int(
161 math.ceil(float(rest_volume) / context.MIN_VOL) * context.MIN_VOL
162 )
163 )
164 order_id = context.insert_order(
165 context.config.ticker,
166 context.config.exchange,
167 context.config.source,
168 context.config.account,
169 context.price,
170 volume,
171 context.config.priceType,
172 context.config.side,
173 context.config.offset,
174 )
175 rest_volume -= order_volume
176 vol_list[order_id] = volume
177 context.order_placed = True
178 date_time_for_nano = datetime.fromtimestamp(now_nano / (10**9))
179 time_str = date_time_for_nano.strftime("%Y-%m-%d %H:%M:%S.%f")
180 context.log.info(
181 "-------------------- {} 开始下单 时间 {} --------------------".format(
182 context.trigger_info, time_str
183 )
184 )
185 for key, val in vol_list.items():
186 context.log.info("订单号 {}, 下单数量 {} 下单价格 {}".format(key, val, context.price))
187
188 update_strategy_state(
189 lf.enums.StrategyState.Normal,
190 "下单完成, 退出任务",
191 context,
192 )
193 context.log.info("下单完成, 退出任务")
194 context.req_deregister()
195
196
197def post_start(context):
198 start = context.config.startTime - 60000000
199
200 if context.config.startTime > 0:
201 context.add_timer(context.config.startTime, lambda ctx, event: place_order(ctx))
202
203
204def on_quote(context, quote, source_location, dest):
205 if context.config.orderPrice == "0":
206 context.price = quote.last_price
207 elif context.config.orderPrice == "1":
208 if context.config.side == Side.Buy:
209 context.price = quote.ask_price[0]
210 else:
211 context.price = quote.bid_price[0]
212 elif context.config.orderPrice == "2":
213 if context.config.side == Side.Buy:
214 context.price = quote.bid_price[0]
215 else:
216 context.price = quote.ask_price[0]
217
218 if context.config.priceCondition:
219 for i, item in enumerate(context.config.priceCondition):
220 is_price_triggerred = True
221 if item["currentPrice"] == "1":
222 quote_price = quote.bid_price[0]
223 elif item["currentPrice"] == "-1":
224 quote_price = quote.ask_price[0]
225 else:
226 quote_price = quote.last_price
227 if item["compare"] == "1":
228 is_price_triggerred = quote_price >= float(item["triggerPrice"])
229 if is_price_triggerred:
230 context.trigger_info = "价格大于等于" + str(item["triggerPrice"])
231 elif item["compare"] == "2":
232 is_price_triggerred = quote_price > float(item["triggerPrice"])
233 if is_price_triggerred:
234 context.trigger_info = "价格大于" + str(item["triggerPrice"])
235 elif item["compare"] == "3":
236 is_price_triggerred = quote_price <= float(item["triggerPrice"])
237 if is_price_triggerred:
238 context.trigger_info = "价格小于等于" + str(item["triggerPrice"])
239 elif item["compare"] == "4":
240 is_price_triggerred = quote_price < float(item["triggerPrice"])
241 if is_price_triggerred:
242 context.trigger_info = "价格小于" + str(item["triggerPrice"])
243 else:
244 return
245 if not is_price_triggerred:
246 return
247 place_order(context)
配置文件package.json
1{
2 "name": "@kungfu-trader/kfx-task-condition",
3 "author": {
4 "name": "kungfu-trader",
5 "email": "info@kungfu.link"
6 },
7 "kungfuBuild": {
8 "python": {
9 "dependencies": {}
10 }
11 },
12 "kungfuConfig": {
13 "key": "ConditionOrder",
14 "name": "条件单",
15 "ui_config": {
16 "position": "make_order"
17 },
18 "language": {
19 "zh-CN": {
20 "accountId": "账户",
21 "marketSource": "行情",
22 "ticker": "标的",
23 "side": "买卖",
24 "offset": "开平",
25 "priceType": "下单类型",
26 "priceCondition": "价格条件",
27 "currentPrice": "当前价格",
28 "currentPrice_0": "买一价",
29 "currentPrice_1": "卖一价",
30 "currentPrice_2": "最新价",
31 "compare": "比较符",
32 "triggerPrice": "触发价格",
33 "orderPrice": "下单价格",
34 "orderPrice_0": "最新价",
35 "orderPrice_1": "对手价一档",
36 "orderPrice_2": "同方向一档",
37 "volume": "数量",
38 "maxLot": "单次最大手数",
39 "maxLotTip": "柜台允许的单次最大手数, 以此为基础进行拆单, 不填则表示柜台无限制, 股票请填100的整数倍, 否则自动向下取整, 小于100则会强制设成100",
40 "startTime": "触发时间"
41 },
42 "en-US": {
43 "accountId": "Account Id",
44 "marketSource": "Market Source",
45 "ticker": "Ticker",
46 "side": "Side",
47 "offset": "Offset",
48 "priceType": "Price Type",
49 "priceCondition": "Price Condition",
50 "currentPrice": "Current Price",
51 "currentPrice_0": "Buy First Price",
52 "currentPrice_1": "Sell First Price",
53 "currentPrice_2": "Latest Price",
54 "compare": "Compare",
55 "triggerPrice": "Trigger Price",
56 "orderPrice": "Order Price",
57 "orderPrice_0": "Latest Price",
58 "orderPrice_1": "Opponent First Level Price",
59 "orderPrice_2": "Same Side First Level Price",
60 "volume": "Volume",
61 "maxLot": "Max Lot",
62 "maxLotTip": "The single max hands that counter allow, this is the basis for the dismantling of the order. If you don't fill in the form, it means the counter is unlimited. Please fill in an integer multiple of 100, otherwise it will be rounded down automatically. If it is less than 100, it will be set to 100.",
63 "startTime": "Trigger Time"
64 }
65 },
66 "config": {
67 "strategy": {
68 "type": "trade",
69 "settings": [
70 {
71 "key": "accountId",
72 "name": "ConditionOrder.accountId",
73 "type": "td",
74 "required": true,
75 "showArg": true
76 },
77 {
78 "key": "marketSource",
79 "name": "ConditionOrder.marketSource",
80 "type": "md",
81 "required": true,
82 "showArg": true
83 },
84 {
85 "key": "ticker",
86 "name": "ConditionOrder.ticker",
87 "type": "instrument",
88 "required": true,
89 "showArg": true
90 },
91 {
92 "key": "side",
93 "name": "ConditionOrder.side",
94 "type": "side",
95 "default": 0,
96 "required": true,
97 "showArg": true
98 },
99 {
100 "key": "offset",
101 "name": "ConditionOrder.offset",
102 "type": "offset",
103 "default": 0,
104 "required": true,
105 "showArg": true
106 },
107 {
108 "key": "priceType",
109 "name": "ConditionOrder.priceType",
110 "type": "priceType",
111 "default": "1",
112 "required": false
113 },
114 {
115 "key": "priceCondition",
116 "name": "ConditionOrder.priceCondition",
117 "type": "table",
118 "columns": [
119 {
120 "key": "currentPrice",
121 "name": "ConditionOrder.currentPrice",
122 "type": "select",
123 "options": [
124 {
125 "label": "ConditionOrder.currentPrice_0",
126 "value": "1"
127 },
128 {
129 "label": "ConditionOrder.currentPrice_1",
130 "value": "-1"
131 },
132 {
133 "label": "ConditionOrder.currentPrice_2",
134 "value": "0"
135 }
136 ],
137 "default": "0",
138 "required": true
139 },
140 {
141 "key": "compare",
142 "name": "ConditionOrder.compare",
143 "type": "select",
144 "options": [
145 {
146 "label": ">=",
147 "value": "1"
148 },
149 {
150 "label": ">",
151 "value": "2"
152 },
153 {
154 "label": "<=",
155 "value": "3"
156 },
157 {
158 "label": "<",
159 "value": "4"
160 }
161 ],
162 "default": "1",
163 "required": true
164 },
165 {
166 "key": "triggerPrice",
167 "name": "ConditionOrder.triggerPrice",
168 "type": "float",
169 "required": true
170 }
171 ],
172 "required": false
173 },
174 {
175 "key": "orderPrice",
176 "name": "ConditionOrder.orderPrice",
177 "type": "select",
178 "options": [
179 {
180 "label": "ConditionOrder.orderPrice_0",
181 "value": "0"
182 },
183 {
184 "label": "ConditionOrder.orderPrice_1",
185 "value": "1"
186 },
187 {
188 "label": "ConditionOrder.orderPrice_2",
189 "value": "2"
190 }
191 ],
192 "required": true
193 },
194 {
195 "key": "volume",
196 "name": "ConditionOrder.volume",
197 "type": "int",
198 "min": 0,
199 "required": true
200 },
201 {
202 "key": "maxLot",
203 "name": "ConditionOrder.maxLot",
204 "type": "int",
205 "min": 0,
206 "tip": "ConditionOrder.maxLotTip",
207 "required": false,
208 "default": 0
209 },
210 {
211 "key": "startTime",
212 "name": "ConditionOrder.startTime",
213 "type": "timePicker",
214 "required": false
215 }
216 ]
217 }
218 }
219 }
220}
说明文档 README.md
1条件单逻辑说明 :
2
3- 条件单可以接受两个类型的条件为约束,一个是价格条件,一个是时间条件
4- 当仅有价格条件时 会在当前价格满足大于小于等于触发价格时下单
5- 当仅有时间条件时 会在到达目标设定时间点时下单
6- 当价格条件跟时间条件同时存在时,哪个条件先满足,以哪个条件下单
7- 单次最大手数:若设置下单数量1000,而单比最大下单量为100,则会在下单时,拆为10份,每次100,一同下出。
2. 编译生成二进制文件
编译流程:
(1)在命令行中进入交易任务文件根目录下
$ cd kfx-task-condition-demo/
# 举例交易任务文件名为 kfx-task-condition-demo
(2)执行kfs编译命令
$ kfs extension build
注意
执行编译的kfs命令路径为功夫的安装目录下 /Kungfu/resources/kfc/kfs
举例: Windows系统下,功夫安装路径为D盘的根目录,即功夫安装目录为 D:/Kungfu
编译命令为 : D:/Kungfu/resources/kfc/kfs.exe extension build
举例: linux系统下,功夫安装路径为/opt/Kungfu
编译命令为 : /opt/Kungfu/resources/kfc/kfs extension build
编译后文件目录结构:
kfx-task-condition-demo/
├── src/
│ └── python
| └── ConditionOrder
| └── __init__.py
├── README.md
├── package.json
├── __pypackages__/ # Python模块库, 自动生成
├── dist/ # 编译打包出来的二进制文件所在文件夹
| └── ConditionOrder
| └── ConditionOrder.cp39-win_amd64.pyd # 二进制文件
├── pdm.lock # build后下载依赖库自动生成的文件
└── pyproject.toml # build后下载依赖库自动生成的文件
3. 将文件拷贝到插件目录
流程:
(1)在命令行中进入交易任务文件根目录下
$ cd kfx-task-condition-demo/
# 举例交易任务文件名为 kfx-task-condition-demo
(2)复制二进制文件所在的目录拷贝到Kungfu插件目录
注意
插件目录路径为功夫的安装目录下 /Kungfu/resources/app/kungfu-extensions/
Windows系统 : Copy-Item -Path ./dist/ConditionOrder/ -Destination D:/Kungfu/resources/app/kungfu-extensions/ConditionOrder/ -Recurse -Force
Linux系统 : cp -r ./dist/ConditionOrder/ /opt/Kungfu/resources/app/kungfu-extensions/
4. 添加交易任务
重启Kungfu图形客户端,选择主面板中的”交易任务”面板,点击右上角的”添加”按钮,在弹出的”选择交易任务”面板中选择”条件单”

交易任务和策略的区别
功能描述 |
交易任务 |
策略 |
前端参数 |
可传参(交易任务可以接收前端配置的参数,前端配置的参数会以一个json字符串格式传入到 context.arguments。) |
不可传参 |
交易进度信息统计展示 |
展示进度信息(交易任务可以在成交和委托回调中,统计成交进度,或是属于交易任务特有的指标,前端界面可以显示这些信息,同时还可以进行异常报警提示。) |
不展示 |
柜台对接
从零开始构建一个交易柜台
创建目录结构
源码目录结构:
xtp/ # xtp柜台名称
├── src/
│ └── cpp
│ ├── buffer_data.h
│ ├── exports.cpp
│ ├── marketdata_xtp.cpp
│ ├── marketdata_xtp.h
│ ├── serialize_xtp.h
│ ├── trader_xtp.cpp
│ ├── trader_xtp.h
│ └── type_convert.h
└── package.json # 编译配置信息
package.json文件
1{
2 "name": "@kungfu-trader/kfx-broker-xtp-demo",
3 "author": {
4 "name": "Kungfu Trader",
5 "email": "info@kungfu.link"
6 },
7 "version": "3.0.6-alpha.4",
8 "description": "Kungfu Extension - XTP Demo",
9 "license": "Apache-2.0",
10 "main": "package.json",
11 "repository": {
12 "url": "https://github.com/kungfu-trader/kungfu.git"
13 },
14 "publishConfig": {
15 "registry": "https://npm.pkg.github.com"
16 },
17 "binary": {
18 "module_name": "kfx-broker-xtp-demo",
19 "module_path": "dist/xtp",
20 "remote_path": "{module_name}/v{major}/v{version}",
21 "package_name": "{module_name}-v{version}-{platform}-{arch}-{configuration}.tar.gz",
22 "host": "https://prebuilt.libkungfu.cc"
23 },
24 "scripts": {
25 "build": "C:/Users/PC/Documents/kfgit/v27/kf27/artifact/build/stage/artifact-kungfu/v2/v3.0.6-alpha.4/win-unpacked/resources/kfc/kfs.exe extension build", // 替换成你的Kungfu安装目录
26 "clean": "C:/Users/PC/Documents/kfgit/v27/kf27/artifact/build/stage/artifact-kungfu/v2/v3.0.6-alpha.4/win-unpacked/resources/kfc/kfs.exe extension clean", // 替换成你的Kungfu安装目录
27 "format": "node ../../framework/core/.gyp/run-format-cpp.js src",
28 "install": "node -e \"require('@kungfu-trader/kungfu-core').prebuilt('install')\"",
29 "package": "kfs extension package"
30 },
31 "dependencies": {
32 "@kungfu-trader/kungfu-core": "^3.0.6-alpha.4"
33 },
34 "devDependencies": {
35 "@kungfu-trader/kungfu-sdk": "^3.0.6-alpha.4"
36 },
37 "kungfuDependencies": {
38 "xtp": "v2.2.37.4"
39 },
40 "kungfuBuild": {
41 "build_type": "Release",
42 "cpp": {
43 "target": "bind/python",
44 "links": {
45 "windows": [
46 "xtptraderapi",
47 "xtpquoteapi"
48 ],
49 "linux": [
50 "xtptraderapi",
51 "xtpquoteapi"
52 ],
53 "macos": [
54 "xtptraderapi",
55 "xtpquoteapi"
56 ]
57 }
58 }
59 },
60 "kungfuConfig": {
61 "key": "xtp",
62 "name": "XTP",
63 "language": {
64 "zh-CN": {
65 "account_name": "账户别名",
66 "account_name_tip": "请填写账户别名",
67 "account_id": "账户",
68 "account_id_tip": "请填写账户, 例如:1504091",
69 "password": "密码",
70 "password_tip": "请填写密码, 例如:123456",
71 "software_key": "软件密钥",
72 "software_key_tip": "请填写软件密钥, 例如:b8aa7173bba3470e390d787219b2112",
73 "td_ip": "交易IP",
74 "td_ip_tip": "请填写交易IP, 例如:61.152.102.111",
75 "td_port": "交易端口",
76 "td_port_tip": "请填写交易端口, 例如:8601",
77 "client_id": "客户ID",
78 "client_id_tip": "请填写自定义多点登录ID 1-99整数,不同节点之间不要重复",
79 "md_ip": "行情IP",
80 "md_ip_tip": "请填写行情IP, 例如:61.152.102.110",
81 "md_port": "行情端口",
82 "md_port_tip": "请填写行情端口, 例如:8602",
83 "protocol": "协议",
84 "protocol_tip": "请选择协议, tcp 或者 udp",
85 "buffer_size": "缓冲区大小",
86 "buffer_size_tip": "请填写 缓冲区大小(mb)",
87 "sync_external_order": "同步外部订单",
88 "sync_external_order_msg": "是否同步外部订单",
89 "sync_external_order_tip": "若开启则同步用户在其他交易软件的订单",
90 "recover_order_trade": "恢复订单",
91 "recover_order_trade_msg": "启动时是否查询恢复订单",
92 "recover_order_trade_tip": "若开启则启动时查询今日委托和成交",
93 "query_instruments": "查询可交易标的",
94 "query_instruments_tip": "是否查询可交易标的, 开启后会查询所有可交易标的, 流量太大频繁查询可能导致账号或ip被XTP拉黑"
95 },
96 "en-US": {
97 "account_name": "account name",
98 "account_name_tip": "Please enter account name",
99 "account_id": "account id",
100 "account_id_tip": "Please enter account id",
101 "password": "password",
102 "password_tip": "Please enter password, for example:123456",
103 "software_key": "software key",
104 "software_key_tip": "Please enter software key, for example :b8aa7173bba3470e390d787219b2112",
105 "td_ip": "td IP",
106 "td_ip_tip": "Please enter td IP, for example:61.152.102.111",
107 "td_port": "td port",
108 "td_port_tip": "Please enter td port, for example:8601",
109 "client_id": "client id",
110 "client_id_tip": "Please enter t user-defined multipoint client ID, which is an integer ranging from 1 to 99. The value must be unique on different nodes",
111 "md_ip": "md IP",
112 "md_ip_tip": "Please enter md IP, for example :61.152.102.110",
113 "md_port": "md port",
114 "md_port_tip": "Please enter md port, for example:8602",
115 "protocol": "protocol",
116 "protocol_tip": "Please select protocol, tcp or udp",
117 "buffer_size": "buffer size",
118 "buffer_size_tip": "Please enter buffer size(mb)",
119 "sync_external_order": "sync external order",
120 "sync_external_order_msg": "Whether open sync_external_order",
121 "sync_external_order_tip": "If enabled, it synchronizes users' orders in other trading software",
122 "recover_order_trade": "recover order trade",
123 "recover_order_trade_msg": "Whether recover order trade",
124 "recover_order_trade_tip": "If enabled, query order and trade when TD ready",
125 "query_instruments": "query instruments",
126 "query_instruments_tip": "If enabled, query instruments. too much infomation may result in account or ip blacklisted by XTP"
127 }
128 },
129 "config": {
130 "td": {
131 "type": [
132 "stock"
133 ],
134 "settings": [
135 {
136 "key": "account_name",
137 "name": "xtp.account_name",
138 "type": "str",
139 "tip": "xtp.account_name_tip"
140 },
141 {
142 "key": "account_id",
143 "name": "xtp.account_id",
144 "type": "str",
145 "required": true,
146 "primary": true,
147 "tip": "xtp.account_id_tip"
148 },
149 {
150 "key": "password",
151 "name": "xtp.password",
152 "type": "password",
153 "required": true,
154 "tip": "xtp.password_tip"
155 },
156 {
157 "key": "software_key",
158 "name": "xtp.software_key",
159 "type": "str",
160 "required": true,
161 "tip": "xtp.software_key_tip"
162 },
163 {
164 "key": "td_ip",
165 "name": "xtp.td_ip",
166 "type": "str",
167 "required": true,
168 "tip": "xtp.td_ip_tip"
169 },
170 {
171 "key": "td_port",
172 "name": "xtp.td_port",
173 "type": "int",
174 "required": true,
175 "tip": "xtp.td_port_tip"
176 },
177 {
178 "key": "client_id",
179 "name": "xtp.client_id",
180 "type": "int",
181 "required": true,
182 "tip": "xtp.client_id_tip"
183 },
184 {
185 "key": "sync_external_order",
186 "name": "xtp.sync_external_order",
187 "type": "bool",
188 "errMsg": "xtp.sync_external_order_msg",
189 "required": false,
190 "default": false,
191 "tip": "xtp.sync_external_order_tip"
192 },
193 {
194 "key": "recover_order_trade",
195 "name": "xtp.recover_order_trade",
196 "type": "bool",
197 "errMsg": "xtp.recover_order_trade_msg",
198 "required": false,
199 "default": false,
200 "tip": "xtp.recover_order_trade_tip"
201 }
202 ]
203 },
204 "md": {
205 "type": [
206 "stock"
207 ],
208 "settings": [
209 {
210 "key": "account_id",
211 "name": "xtp.account_id",
212 "type": "str",
213 "required": true,
214 "tip": "xtp.account_id_tip",
215 "default": "15011218"
216 },
217 {
218 "key": "password",
219 "name": "xtp.password",
220 "type": "password",
221 "required": true,
222 "tip": "xtp.password_tip",
223 "default": "PsVqy99v"
224 },
225 {
226 "key": "md_ip",
227 "name": "xtp.md_ip",
228 "type": "str",
229 "required": true,
230 "tip": "xtp.md_ip_tip",
231 "default": "119.3.103.38"
232 },
233 {
234 "key": "md_port",
235 "name": "xtp.md_port",
236 "type": "int",
237 "required": true,
238 "tip": "xtp.md_port_tip",
239 "default": 6002
240 },
241 {
242 "key": "protocol",
243 "name": "xtp.protocol",
244 "type": "select",
245 "options": [
246 {
247 "value": "tcp",
248 "label": "tcp"
249 },
250 {
251 "value": "udp",
252 "label": "udp"
253 }
254 ],
255 "required": false,
256 "tip": "xtp.protocol_tip",
257 "default": "tcp"
258 },
259 {
260 "key": "buffer_size",
261 "name": "xtp.buffer_size",
262 "type": "int",
263 "tip": "xtp.buffer_size_tip",
264 "required": false
265 },
266 {
267 "key": "client_id",
268 "name": "xtp.client_id",
269 "type": "int",
270 "required": true,
271 "tip": "xtp.client_id_tip",
272 "default": 23
273 },
274 {
275 "key": "query_instruments",
276 "name": "xtp.query_instruments",
277 "type": "bool",
278 "required": false,
279 "tip": "xtp.query_instruments_tip",
280 "default": false
281 }
282 ]
283 }
284 }
285 }
286}
serialize_xtp.h文件
XTP数据结构添加to_stirng方便打印日志查看数据
1#ifndef KUNGFU_SERIALIZE_XTP_H
2#define KUNGFU_SERIALIZE_XTP_H
3#include <nlohmann/json.hpp>
4#include <xtp_api_struct.h>
5namespace nlohmann {
6NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(XTPQueryOrderRsp, order_xtp_id, order_client_id, order_cancel_client_id,
7 order_cancel_xtp_id, ticker, market, price, quantity, price_type, side,
8 position_effect, reserved1, reserved2, business_type, qty_traded, qty_left,
9 insert_time, update_time, cancel_time, trade_amount, order_local_id, order_status,
10 order_submit_status, order_type);
11NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(XTPOrderInsertInfo, order_xtp_id, order_client_id, ticker, market, price, stop_price,
12 quantity, price_type, side, position_effect, reserved1, reserved2, business_type);
13NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(XTPTradeReport, order_xtp_id, order_client_id, ticker, market, local_order_id,
14 exec_id, price, quantity, trade_time, trade_amount, report_index, order_exch_id,
15 trade_type, side, position_effect, reserved1, reserved2, business_type, branch_pbu);
16NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(XTPOrderCancelInfo, order_cancel_xtp_id, order_xtp_id);
17NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(XTPQueryStkPositionRsp, ticker, ticker_name, market, total_qty, sellable_qty,
18 avg_price, unrealized_pnl, yesterday_position, purchase_redeemable_qty,
19 position_direction, position_security_type, executable_option, lockable_position,
20 executable_underlying, locked_position, usable_locked_position, profit_price,
21 buy_cost, profit_cost, unknown);
22NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(XTPQueryAssetRsp, total_asset, buying_power, security_asset, fund_buy_amount,
23 fund_buy_fee, fund_sell_amount, fund_sell_fee, withholding_amount, account_type,
24 frozen_margin, frozen_exec_cash, frozen_exec_fee, pay_later, preadva_pay,
25 orig_banlance, banlance, deposit_withdraw, trade_netting, captial_asset,
26 force_freeze_amount, preferred_amount, repay_stock_aval_banlance,
27 exchange_cur_risk_degree, company_cur_risk_degree, unknown);
28NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(XTPMarketDataStruct, exchange_id, ticker, last_price, pre_close_price, open_price,
29 high_price, low_price, close_price, pre_total_long_positon, total_long_positon,
30 pre_settl_price, settl_price, upper_limit_price, lower_limit_price, pre_delta,
31 curr_delta, data_time, qty, turnover, avg_price, bid, ask, bid_qty, ask_qty,
32 trades_count, ticker_status);
33NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(XTPRspInfoStruct, error_id, error_msg);
34NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(XTPOrderInfoEx, order_xtp_id, order_client_id, order_cancel_client_id,
35 order_cancel_xtp_id, ticker, market, price, quantity, price_type, business_type,
36 qty_traded, qty_left, insert_time, update_time, cancel_time, trade_amount,
37 order_local_id, order_status, order_submit_status, order_type, order_exch_id,
38 order_err_t, unknown);
39NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(XTPSpecificTickerStruct, exchange_id, ticker);
40} // namespace nlohmann
41namespace kungfu::wingchun::xtp {
42template <typename T> std::string to_string(const T &ori) {
43 nlohmann::json j;
44 to_json(j, ori);
45 return j.dump();
46}
47} // namespace kungfu::wingchun::xtp
48#endif
type_convert.h文件
XTP和Kungfu数据结构的转换
1#ifndef KUNGFU_XTP_EXT_TYPE_CONVERT_H
2#define KUNGFU_XTP_EXT_TYPE_CONVERT_H
3
4#include <cstddef>
5#include <cstdio>
6#include <cstring>
7#include <ctime>
8#include <kungfu/longfist/longfist.h>
9#include <kungfu/wingchun/common.h>
10#include <kungfu/yijinjing/time.h>
11#include <nlohmann/json.hpp>
12#include <xtp_api_struct.h>
13
14using namespace kungfu::longfist;
15using namespace kungfu::longfist::enums;
16using namespace kungfu::longfist::types;
17
18namespace kungfu::wingchun::xtp {
19
20template <typename T> inline void set_offset(T &t) {
21 switch (t.side) {
22 case Side::Buy:
23 t.offset = Offset::Open;
24 break;
25 case Side::Sell:
26 t.offset = Offset::Close;
27 break;
28 default:
29 SPDLOG_ERROR("Invalidated kf_side : {} ", t.side);
30 break;
31 }
32}
33
34inline XTP_PROTOCOL_TYPE get_xtp_protocol_type(const std::string &p) {
35 if (p == "udp") {
36 return XTP_PROTOCOL_UDP;
37 } else {
38 return XTP_PROTOCOL_TCP;
39 }
40}
41
42inline int64_t nsec_from_xtp_timestamp(int64_t xtp_time) {
43 std::tm result = {};
44 result.tm_year = xtp_time / (int64_t)1e13 - 1900;
45 result.tm_mon = xtp_time % (int64_t)1e13 / (int64_t)1e11 - 1;
46 result.tm_mday = xtp_time % (int64_t)1e11 / (int64_t)1e9;
47 result.tm_hour = xtp_time % (int64_t)1e9 / (int64_t)1e7;
48 result.tm_min = xtp_time % (int)1e7 / (int)1e5;
49 result.tm_sec = xtp_time % (int)1e5 / (int)1e3;
50 int milli_sec = xtp_time % (int)1e3;
51 std::time_t parsed_time = std::mktime(&result);
52 return parsed_time * kungfu::yijinjing::time_unit::NANOSECONDS_PER_SECOND +
53 milli_sec * kungfu::yijinjing::time_unit::NANOSECONDS_PER_MILLISECOND;
54}
55
56inline void from_xtp(const XTP_MARKET_TYPE &xtp_market_type, char *exchange_id) {
57 if (xtp_market_type == XTP_MKT_SH_A) {
58 strcpy(exchange_id, "SSE");
59 } else if (xtp_market_type == XTP_MKT_SZ_A) {
60 strcpy(exchange_id, "SZE");
61 }
62}
63
64inline void to_xtp(XTP_MARKET_TYPE &xtp_market_type, const char *exchange_id) {
65 if (!strcmp(exchange_id, "SSE")) {
66 xtp_market_type = XTP_MKT_SH_A;
67 } else if (!strcmp(exchange_id, "SZE")) {
68 xtp_market_type = XTP_MKT_SZ_A;
69 } else {
70 xtp_market_type = XTP_MKT_UNKNOWN;
71 }
72}
73
74inline std::string exchange_id_from_xtp(const XTP_EXCHANGE_TYPE ex) {
75 if (ex == XTP_EXCHANGE_SH) {
76 return EXCHANGE_SSE;
77 } else if (ex == XTP_EXCHANGE_SZ) {
78 return EXCHANGE_SZE;
79 } else {
80 return "Unknown";
81 }
82}
83
84inline void from_xtp(const XTP_EXCHANGE_TYPE &xtp_exchange_type, char *exchange_id) {
85 if (xtp_exchange_type == XTP_EXCHANGE_SH) {
86 strcpy(exchange_id, "SSE");
87 } else if (xtp_exchange_type == XTP_EXCHANGE_SZ) {
88 strcpy(exchange_id, "SZE");
89 }
90}
91
92inline void to_xtp_exchange(XTP_EXCHANGE_TYPE &xtp_exchange_type, const char *exchange_id) {
93 if (strcmp(exchange_id, "SSE") == 0) {
94 xtp_exchange_type = XTP_EXCHANGE_SH;
95 } else if (strcmp(exchange_id, "SZE") == 0) {
96 xtp_exchange_type = XTP_EXCHANGE_SZ;
97 } else {
98 xtp_exchange_type = XTP_EXCHANGE_UNKNOWN;
99 }
100}
101
102inline void from_xtp(const XTP_PRICE_TYPE &xtp_price_type, const XTP_MARKET_TYPE &xtp_exchange_type,
103 PriceType &price_type) {
104 if (xtp_price_type == XTP_PRICE_LIMIT)
105 price_type = PriceType::Limit;
106 else if (xtp_price_type == XTP_PRICE_BEST5_OR_CANCEL)
107 price_type = PriceType::FakBest5;
108 else if (xtp_exchange_type == XTP_MKT_SH_A) {
109 if (xtp_price_type == XTP_PRICE_BEST5_OR_LIMIT)
110 price_type = PriceType::ReverseBest;
111 } else if (xtp_exchange_type == XTP_MKT_SZ_A) {
112 if (xtp_price_type == XTP_PRICE_BEST_OR_CANCEL)
113 price_type = PriceType::Fak;
114 else if (xtp_price_type == XTP_PRICE_FORWARD_BEST)
115 price_type = PriceType::ForwardBest;
116 else if (xtp_price_type == XTP_PRICE_REVERSE_BEST_LIMIT)
117 price_type = PriceType::ReverseBest;
118 else if (xtp_price_type == XTP_PRICE_ALL_OR_CANCEL)
119 price_type = PriceType::Fok;
120 } else
121 price_type = PriceType::Unknown;
122}
123
124inline void to_xtp(XTP_PRICE_TYPE &xtp_price_type, const PriceType &price_type, const char *exchange) {
125 if (price_type == PriceType::Limit)
126 xtp_price_type = XTP_PRICE_LIMIT;
127 else if ((price_type == PriceType::Any) || (price_type == PriceType::FakBest5))
128 xtp_price_type = XTP_PRICE_BEST5_OR_CANCEL;
129 else if (strcmp(exchange, EXCHANGE_SSE) == 0) {
130 if (price_type == PriceType::ReverseBest)
131 xtp_price_type = XTP_PRICE_BEST5_OR_LIMIT;
132 } else if (strcmp(exchange, EXCHANGE_SZE) == 0) {
133 if (price_type == PriceType::Fak)
134 xtp_price_type = XTP_PRICE_BEST_OR_CANCEL;
135 else if (price_type == PriceType::ForwardBest)
136 xtp_price_type = XTP_PRICE_FORWARD_BEST;
137 else if (price_type == PriceType::ReverseBest)
138 xtp_price_type = XTP_PRICE_REVERSE_BEST_LIMIT;
139 else if (price_type == PriceType::Fok)
140 xtp_price_type = XTP_PRICE_ALL_OR_CANCEL;
141 } else
142 xtp_price_type = XTP_PRICE_TYPE_UNKNOWN;
143}
144
145inline void from_xtp(const XTP_ORDER_STATUS_TYPE &xtp_order_status, OrderStatus &status) {
146 if (xtp_order_status == XTP_ORDER_STATUS_INIT || xtp_order_status == XTP_ORDER_STATUS_NOTRADEQUEUEING) {
147 status = OrderStatus::Pending;
148 } else if (xtp_order_status == XTP_ORDER_STATUS_ALLTRADED) {
149 status = OrderStatus::Filled;
150 } else if (xtp_order_status == XTP_ORDER_STATUS_CANCELED) {
151 status = OrderStatus::Cancelled;
152 } else if (xtp_order_status == XTP_ORDER_STATUS_PARTTRADEDQUEUEING) {
153 status = OrderStatus::PartialFilledActive;
154 } else if (xtp_order_status == XTP_ORDER_STATUS_PARTTRADEDNOTQUEUEING) {
155 status = OrderStatus::PartialFilledNotActive;
156 } else if (xtp_order_status == XTP_ORDER_STATUS_REJECTED) {
157 status = OrderStatus::Error;
158 } else {
159 status = OrderStatus::Unknown;
160 }
161}
162
163inline void from_xtp(XTPQSI *ticker_info, Instrument &instrument) {
164 instrument.instrument_id = ticker_info->ticker;
165 if (ticker_info->exchange_id == 1) {
166 instrument.exchange_id = EXCHANGE_SSE;
167 } else if (ticker_info->exchange_id == 2) {
168 instrument.exchange_id = EXCHANGE_SZE;
169 } else {
170 instrument.exchange_id = "unknown";
171 }
172 memcpy(instrument.product_id, ticker_info->ticker_name, strlen(ticker_info->ticker_name));
173 instrument.instrument_type = get_instrument_type(instrument.exchange_id, instrument.instrument_id);
174 instrument.price_tick = ticker_info->price_tick;
175}
176
177inline void from_xtp(const XTP_SIDE_TYPE &xtp_side, Side &side) {
178 if (xtp_side == XTP_SIDE_BUY) {
179 side = Side::Buy;
180 } else if (xtp_side == XTP_SIDE_SELL) {
181 side = Side::Sell;
182 }
183}
184
185inline void to_xtp(XTP_SIDE_TYPE &xtp_side, const Side &side) {
186 if (side == Side::Buy) {
187 xtp_side = XTP_SIDE_BUY;
188 } else if (side == Side::Sell) {
189 xtp_side = XTP_SIDE_SELL;
190 }
191}
192
193inline void to_xtp(XTPMarketDataStruct &des, const Quote &ori) {
194 // TODO
195}
196
197inline void from_xtp(const XTPMarketDataStruct &ori, Quote &des) {
198 des.data_time = nsec_from_xtp_timestamp(ori.data_time);
199 des.instrument_id = ori.ticker;
200 from_xtp(ori.exchange_id, des.exchange_id);
201
202 des.instrument_type = ori.data_type != XTP_MARKETDATA_OPTION
203 ? get_instrument_type(des.exchange_id, des.instrument_id)
204 : InstrumentType::StockOption;
205
206 des.last_price = ori.last_price;
207 des.pre_settlement_price = ori.pre_settl_price;
208 des.pre_close_price = ori.pre_close_price;
209 des.open_price = ori.open_price;
210 des.high_price = ori.high_price;
211 des.low_price = ori.low_price;
212 des.volume = ori.qty;
213 des.turnover = ori.turnover;
214 des.close_price = ori.close_price;
215 des.settlement_price = ori.settl_price;
216 des.upper_limit_price = ori.upper_limit_price;
217 des.lower_limit_price = ori.lower_limit_price;
218 des.total_trade_num = ori.trades_count;
219
220 memcpy(des.ask_price, ori.ask, sizeof(des.ask_price));
221 memcpy(des.bid_price, ori.bid, sizeof(des.ask_price));
222 for (std::size_t i = 0; i < 10; i++) {
223 des.ask_volume[i] = ori.ask_qty[i];
224 des.bid_volume[i] = ori.bid_qty[i];
225 }
226}
227
228inline void to_xtp(XTPOrderInsertInfo &des, const OrderInput &ori) {
229 strcpy(des.ticker, ori.instrument_id);
230 to_xtp(des.market, ori.exchange_id);
231 des.price = ori.limit_price;
232 des.quantity = ori.volume;
233 to_xtp(des.side, ori.side);
234 to_xtp(des.price_type, ori.price_type, ori.exchange_id);
235 des.business_type = XTP_BUSINESS_TYPE_CASH;
236}
237
238inline void from_xtp(const XTPOrderInsertInfo &ori, OrderInput &des) {
239 // TODO
240}
241
242inline void from_xtp(const XTPOrderInfo &ori, Order &des) {
243 des.instrument_id = ori.ticker;
244 from_xtp(ori.market, des.exchange_id);
245 from_xtp(ori.price_type, ori.market, des.price_type);
246 des.volume = ori.quantity;
247 des.volume_left = ori.quantity - ori.qty_traded;
248 des.limit_price = ori.price;
249 from_xtp(ori.order_status, des.status);
250 from_xtp(ori.side, des.side);
251 set_offset(des);
252 des.instrument_type = get_instrument_type(des.exchange_id, des.instrument_id);
253 if (ori.update_time > 0) {
254 des.update_time = nsec_from_xtp_timestamp(ori.update_time);
255 }
256 std::string str_external_order_id = std::to_string(ori.order_xtp_id);
257 des.external_order_id = str_external_order_id.c_str();
258}
259
260inline void from_xtp(const XTPQueryOrderRsp &ori, HistoryOrder &des) {
261 des.instrument_id = ori.ticker;
262 from_xtp(ori.market, des.exchange_id);
263 from_xtp(ori.price_type, ori.market, des.price_type);
264 des.volume = ori.quantity;
265 des.volume_left = ori.qty_left;
266 des.limit_price = ori.price;
267 from_xtp(ori.order_status, des.status);
268 from_xtp(ori.side, des.side);
269 set_offset(des);
270 des.instrument_type = get_instrument_type(des.exchange_id, des.instrument_id);
271 if (ori.update_time > 0) {
272 des.update_time = nsec_from_xtp_timestamp(ori.update_time);
273 }
274 des.external_order_id, std::to_string(ori.order_xtp_id).c_str();
275}
276
277inline void from_xtp_no_price_type(const XTPOrderInfo &ori, Order &des) {
278 des.instrument_id = ori.ticker;
279 from_xtp(ori.market, des.exchange_id);
280 des.volume = ori.quantity;
281 des.volume_left = ori.quantity - ori.qty_traded;
282 des.limit_price = ori.price;
283 from_xtp(ori.order_status, des.status);
284 from_xtp(ori.side, des.side);
285 set_offset(des);
286 des.instrument_type = get_instrument_type(des.exchange_id, des.instrument_id);
287 if (ori.update_time > 0) {
288 des.update_time = nsec_from_xtp_timestamp(ori.update_time);
289 }
290 std::string str_external_order_id = std::to_string(ori.order_xtp_id);
291 des.external_order_id = str_external_order_id.c_str();
292}
293
294inline void from_xtp(const XTPTradeReport &ori, Trade &des) {
295 des.instrument_id = ori.ticker;
296 des.volume = ori.quantity;
297 des.price = ori.price;
298 from_xtp(ori.market, des.exchange_id);
299 des.instrument_type = get_instrument_type(des.exchange_id, des.instrument_id);
300 from_xtp(ori.side, des.side);
301 set_offset(des);
302 des.trade_time = yijinjing::time::now_in_nano();
303 des.external_order_id = std::to_string(ori.order_xtp_id).c_str();
304 des.external_trade_id = ori.exec_id;
305}
306
307inline void from_xtp(const XTPQueryTradeRsp &ori, HistoryTrade &des) {
308 des.instrument_id = ori.ticker;
309 des.volume = ori.quantity;
310 des.price = ori.price;
311 from_xtp(ori.market, des.exchange_id);
312 from_xtp(ori.side, des.side);
313 // des.offset = Offset::Open;
314 set_offset(des);
315 des.instrument_type = get_instrument_type(des.exchange_id, des.instrument_id);
316 des.trade_time = nsec_from_xtp_timestamp(ori.trade_time);
317 des.external_order_id = std::to_string(ori.order_xtp_id).c_str();
318 des.external_trade_id = ori.exec_id;
319}
320
321inline void from_xtp(const XTPQueryStkPositionRsp &ori, Position &des) {
322 des.instrument_id = ori.ticker;
323 from_xtp(ori.market, des.exchange_id);
324 des.volume = ori.total_qty;
325 des.yesterday_volume = ori.sellable_qty;
326 des.avg_open_price = ori.avg_price;
327 des.position_cost_price = ori.avg_price;
328 des.static_yesterday = ori.yesterday_position;
329 // des.open_volume // 数据不足以算出该字段, 保持为0
330 // des.frozen_yesterday // 数据不足以算出该字段, 保持为0
331 // des.frozen_total // 数据不足以算出该字段, 保持为0
332}
333
334inline void from_xtp(const XTPQueryAssetRsp &ori, Asset &des) { des.avail = ori.buying_power; }
335
336inline void from_xtp(const XTPTickByTickStruct &ori, Entrust &des) {
337 from_xtp(ori.exchange_id, des.exchange_id);
338 des.instrument_id = ori.ticker;
339 des.data_time = nsec_from_xtp_timestamp(ori.data_time);
340
341 des.price = ori.entrust.price;
342 des.volume = ori.entrust.qty;
343 des.main_seq = ori.entrust.channel_no;
344 des.seq = ori.entrust.seq;
345
346 if (ori.entrust.ord_type == '1') {
347 des.price_type = PriceType::Any;
348 } else if (ori.entrust.ord_type == '2') {
349 des.price_type = PriceType::Limit;
350 } else if (ori.entrust.ord_type == 'U') {
351 des.price_type = PriceType::ForwardBest;
352 }
353
354 // xtp(深交所的order_no在xtp接口注释标注为无意义,偶尔为0 seq对应的是真正的订单号 上交所的order_no是订单号)
355 if (strcmp(des.exchange_id, "SSE")) {
356 des.orig_order_no = ori.entrust.order_no;
357 } else {
358 des.orig_order_no = ori.entrust.seq;
359 }
360
361 switch (ori.entrust.side) {
362 case 'B': {
363 des.side = Side::Buy;
364 break;
365 }
366 case 'S': {
367 des.side = Side::Sell;
368 break;
369 }
370 case '1': {
371 des.side = Side::Buy;
372 break;
373 }
374 case '2': {
375 des.side = Side::Sell;
376 break;
377 }
378 default: {
379 des.side = Side::Unknown;
380 break;
381 }
382 }
383}
384
385inline void from_xtp(const XTPTickByTickStruct &ori, Transaction &des) {
386 from_xtp(ori.exchange_id, des.exchange_id);
387 des.instrument_id = ori.ticker;
388 des.data_time = nsec_from_xtp_timestamp(ori.data_time);
389
390 if (ori.type == XTP_TBT_ENTRUST) {
391 des.instrument_id = ori.ticker;
392 des.data_time = nsec_from_xtp_timestamp(ori.data_time);
393
394 des.main_seq = ori.entrust.channel_no;
395 des.seq = ori.entrust.seq;
396
397 des.price = ori.entrust.price;
398 des.volume = ori.entrust.qty;
399
400 if (ori.entrust.side == 'B') {
401 des.side = Side::Buy;
402 des.bid_no = ori.entrust.order_no;
403 } else {
404 des.side = Side::Sell;
405 des.ask_no = ori.entrust.order_no;
406 }
407 des.exec_type = ExecType::Cancel;
408
409 } else {
410
411 des.main_seq = ori.trade.channel_no;
412 des.seq = ori.trade.seq;
413
414 des.price = ori.trade.price;
415 des.volume = ori.trade.qty;
416
417 des.bid_no = ori.trade.bid_no;
418 des.ask_no = ori.trade.ask_no;
419
420 switch (ori.trade.trade_flag) {
421 case 'B': {
422 des.side = Side::Buy;
423 des.exec_type = ExecType::Trade;
424 break;
425 }
426 case 'S': {
427 des.side = Side::Sell;
428 des.exec_type = ExecType::Trade;
429 break;
430 }
431 case 'N': {
432 des.side = Side::Unknown;
433 des.exec_type = ExecType::Trade;
434 break;
435 }
436 case '4': {
437 des.side = (des.bid_no < des.ask_no) ? Side::Sell : Side::Buy;
438 des.exec_type = ExecType::Cancel;
439 break;
440 }
441 case 'F': {
442 des.side = (des.bid_no < des.ask_no) ? Side::Sell : Side::Buy;
443 des.exec_type = ExecType::Trade;
444 break;
445 }
446 default: {
447 break;
448 }
449 }
450 }
451}
452} // namespace kungfu::wingchun::xtp
453#endif // KUNGFU_XTP_EXT_TYPE_CONVERT_H
buffer_data.h文件
封装xtp回调函数参数, 方便落地原始数据到journal
1#ifndef XTP_BUFFER_DATA_H
2#define XTP_BUFFER_DATA_H
3
4#include "serialize_xtp.h"
5
6static constexpr int32_t kXTPOrderInfoType = 12340001;
7static constexpr int32_t kXTPTradeReportType = 12340002;
8static constexpr int32_t kQueryXTPOrderInfoType = 12340003;
9static constexpr int32_t kQueryXTPTradeReportType = 12340004;
10static constexpr int32_t kCancelOrderErrorType = 12340005;
11
12static constexpr int32_t kQueryAssetType = 12340011;
13static constexpr int32_t kQueryPositionType = 12340012;
14
15struct BufferXTPTradeReport {
16 XTPQueryTradeRsp trade_info;
17 XTPRI error_info;
18 int request_id;
19 bool is_last;
20 uint64_t session_id;
21};
22
23struct BufferXTPOrderInfo {
24 XTPOrderInfo order_info;
25 XTPRI error_info;
26 int request_id;
27 bool is_last;
28 uint64_t session_id;
29};
30
31struct BufferXTPOrderCancelInfo {
32 XTPOrderCancelInfo cancel_info;
33 XTPRI error_info;
34 uint64_t session_id;
35};
36
37struct BufferXTPQueryAssetRsp {
38 XTPQueryAssetRsp asset;
39 XTPRI error_info;
40 int request_id;
41 bool is_last;
42 uint64_t session_id;
43};
44
45struct BufferXTPQueryStkPositionRsp {
46 XTPQueryStkPositionRsp position;
47 XTPRI error_info;
48 int request_id;
49 bool is_last;
50 uint64_t session_id;
51};
52
53namespace nlohmann {
54NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(BufferXTPTradeReport, trade_info, session_id, error_info, request_id, is_last);
55NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(BufferXTPOrderInfo, order_info, session_id, error_info, request_id, is_last);
56NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(BufferXTPOrderCancelInfo, cancel_info, error_info, session_id);
57NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(BufferXTPQueryAssetRsp, asset, error_info, session_id, request_id, is_last);
58NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(BufferXTPQueryStkPositionRsp, position, error_info, session_id, request_id, is_last);
59
60} // namespace nlohmann
61
62#endif // XTP_BUFFER_DATA_H
trader_xtp.h文件
xtp的交易柜台头文件定义
1#ifndef KUNGFU_XTP_EXT_TRADER_H
2#define KUNGFU_XTP_EXT_TRADER_H
3
4#include <kungfu/wingchun/broker/trader.h>
5#include <xtp_trader_api.h>
6
7namespace kungfu::wingchun::xtp {
8using namespace kungfu::longfist;
9using namespace kungfu::longfist::types;
10
11struct TDConfiguration {
12int client_id;
13std::string account_id;
14std::string password;
15std::string software_key;
16std::string td_ip;
17int td_port;
18bool sync_external_order;
19bool recover_order_trade;
20};
21
22inline void from_json(const nlohmann::json &j, TDConfiguration &c) {
23j.at("client_id").get_to(c.client_id);
24j.at("account_id").get_to(c.account_id);
25j.at("password").get_to(c.password);
26j.at("software_key").get_to(c.software_key);
27j.at("td_ip").get_to(c.td_ip);
28j.at("td_port").get_to(c.td_port);
29c.sync_external_order = j.value<bool>("sync_external_order", false);
30c.recover_order_trade = j.value<bool>("recover_order_trade", true);
31}
32
33class TraderXTP : public XTP::API::TraderSpi, public broker::Trader {
34public:
35explicit TraderXTP(broker::BrokerVendor &vendor);
36
37~TraderXTP() override;
38
39[[nodiscard]] longfist::enums::AccountType get_account_type() const override {
40 return longfist::enums::AccountType::Stock;
41}
42
43void pre_start() override;
44
45void on_start() override;
46
47void on_exit() override;
48
49bool insert_order(const event_ptr &event) override;
50
51bool cancel_order(const event_ptr &event) override;
52
53bool req_position() override;
54
55bool on_custom_event(const event_ptr &event) override;
56
57bool req_account() override;
58
59bool req_history_order(const event_ptr &event) override;
60
61bool req_history_trade(const event_ptr &event) override;
62
63void on_recover() override;
64
65/// 当客户端的某个连接与交易后台通信连接断开时,该方法被调用。
66///@param reason 错误原因,请与错误代码表对应
67///@param session_id 资金账户对应的session_id,登录时得到
68///@remark
69/// 用户主动调用logout导致的断线,不会触发此函数。api不会自动重连,当断线发生时,请用户自行选择后续操作,可以在此函数中调用Login重新登录,并更新session_id,此时用户收到的数据跟断线之前是连续的
70void OnDisconnected(uint64_t session_id, int reason) override;
71
72/// 错误应答
73///@param error_info
74/// 当服务器响应发生错误时的具体的错误代码和错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
75///@remark 此函数只有在服务器发生错误时才会调用,一般无需用户处理
76void OnError(XTPRI *error_info) override{};
77
78/// 报单通知
79///@param order_info 订单响应具体信息,用户可以通过order_info.order_xtp_id来管理订单,通过GetClientIDByXTPID() ==
80/// client_id来过滤自己的订单,order_info.qty_left字段在订单为未成交、部成、全成、废单状态时,表示此订单还没有成交的数量,在部撤、全撤状态时,表示此订单被撤的数量。order_info.order_cancel_xtp_id为其所对应的撤单ID,不为0时表示此单被撤成功
81///@param error_info
82/// 订单被拒绝或者发生错误时错误代码和错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
83///@remark
84/// 每次订单状态更新时,都会被调用,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线,在订单未成交、全部成交、全部撤单、部分撤单、已拒绝这些状态时会有响应,对于部分成交的情况,请由订单的成交回报来自行确认。所有登录了此用户的客户端都将收到此用户的订单响应
85void OnOrderEvent(XTPOrderInfo *order_info, XTPRI *error_info, uint64_t session_id) override;
86
87/// 成交通知
88///@param trade_info 成交回报的具体信息,用户可以通过trade_info.order_xtp_id来管理订单,通过GetClientIDByXTPID() ==
89/// client_id来过滤自己的订单。对于上交所,exec_id可以唯一标识一笔成交。当发现2笔成交回报拥有相同的exec_id,则可以认为此笔交易自成交了。对于深交所,exec_id是唯一的,暂时无此判断机制。report_index+market字段可以组成唯一标识表示成交回报。
90///@remark
91/// 订单有成交发生的时候,会被调用,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线。所有登录了此用户的客户端都将收到此用户的成交回报。相关订单为部成状态,需要用户通过成交回报的成交数量来确定,OnOrderEvent()不会推送部成状态。
92void OnTradeEvent(XTPTradeReport *trade_info, uint64_t session_id) override;
93
94/// 撤单出错响应
95///@param cancel_info 撤单具体信息,包括撤单的order_cancel_xtp_id和待撤单的order_xtp_id
96///@param error_info
97/// 撤单被拒绝或者发生错误时错误代码和错误信息,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线,当error_info为空,或者error_info.error_id为0时,表明没有错误
98///@remark 此响应只会在撤单发生错误时被回调
99void OnCancelOrderError(XTPOrderCancelInfo *cancel_info, XTPRI *error_info, uint64_t session_id) override;
100
101/// 请求查询报单响应
102///@param order_info 查询到的一个报单
103///@param error_info
104/// 查询报单时发生错误时,返回的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
105///@param request_id 此消息响应函数对应的请求ID
106///@param is_last
107/// 此消息响应函数是否为request_id这条请求所对应的最后一个响应,当为最后一个的时候为true,如果为false,表示还有其他后续消息响应
108///@remark
109/// 由于支持分时段查询,一个查询请求可能对应多个响应,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
110void OnQueryOrder(XTPQueryOrderRsp *order_info, XTPRI *error_info, int request_id, bool is_last,
111 uint64_t session_id) override;
112
113/// 请求查询成交响应
114///@param trade_info 查询到的一个成交回报
115///@param error_info
116/// 查询成交回报发生错误时返回的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
117///@param request_id 此消息响应函数对应的请求ID
118///@param is_last
119/// 此消息响应函数是否为request_id这条请求所对应的最后一个响应,当为最后一个的时候为true,如果为false,表示还有其他后续消息响应
120///@remark
121/// 由于支持分时段查询,一个查询请求可能对应多个响应,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
122void OnQueryTrade(XTPQueryTradeRsp *trade_info, XTPRI *error_info, int request_id, bool is_last,
123 uint64_t session_id) override;
124
125/// 请求查询投资者持仓响应
126///@param position 查询到的一只股票的持仓情况
127///@param error_info
128/// 查询账户持仓发生错误时返回的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
129///@param request_id 此消息响应函数对应的请求ID
130///@param is_last
131/// 此消息响应函数是否为request_id这条请求所对应的最后一个响应,当为最后一个的时候为true,如果为false,表示还有其他后续消息响应
132///@remark
133/// 由于用户可能持有多个股票,一个查询请求可能对应多个响应,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
134void OnQueryPosition(XTPQueryStkPositionRsp *position, XTPRI *error_info, int request_id, bool is_last,
135 uint64_t session_id) override;
136
137/// 请求查询资金账户响应,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
138///@param asset 查询到的资金账户情况
139///@param error_info
140/// 查询资金账户发生错误时返回的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
141///@param request_id 此消息响应函数对应的请求ID
142///@param is_last
143/// 此消息响应函数是否为request_id这条请求所对应的最后一个响应,当为最后一个的时候为true,如果为false,表示还有其他后续消息响应
144///@remark 需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
145void OnQueryAsset(XTPQueryAssetRsp *asset, XTPRI *error_info, int request_id, bool is_last,
146 uint64_t session_id) override;
147
148/// 请求查询分级基金信息响应,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
149///@param fund_info 查询到的分级基金情况
150///@param error_info
151/// 查询分级基金发生错误时返回的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
152///@param request_id 此消息响应函数对应的请求ID
153///@param is_last
154/// 此消息响应函数是否为request_id这条请求所对应的最后一个响应,当为最后一个的时候为true,如果为false,表示还有其他后续消息响应
155///@remark 需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
156void OnQueryStructuredFund(XTPStructuredFundInfo *fund_info, XTPRI *error_info, int request_id, bool is_last,
157 uint64_t session_id) override{};
158
159/// 请求查询资金划拨订单响应,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
160///@param fund_transfer_info 查询到的资金账户情况
161///@param error_info
162/// 查询资金账户发生错误时返回的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
163///@param request_id 此消息响应函数对应的请求ID
164///@param is_last
165/// 此消息响应函数是否为request_id这条请求所对应的最后一个响应,当为最后一个的时候为true,如果为false,表示还有其他后续消息响应
166///@remark 需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
167void OnQueryFundTransfer(XTPFundTransferNotice *fund_transfer_info, XTPRI *error_info, int request_id, bool is_last,
168 uint64_t session_id) override{};
169
170/// 资金划拨通知
171///@param fund_transfer_info
172/// 资金划拨通知的具体信息,用户可以通过fund_transfer_info.serial_id来管理订单,通过GetClientIDByXTPID() ==
173/// client_id来过滤自己的订单。
174///@param error_info
175/// 资金划拨订单被拒绝或者发生错误时错误代码和错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
176///@remark
177/// 当资金划拨订单有状态变化的时候,会被调用,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线。所有登录了此用户的客户端都将收到此用户的资金划拨通知。
178void OnFundTransfer(XTPFundTransferNotice *fund_transfer_info, XTPRI *error_info, uint64_t session_id) override{};
179
180/// 请求查询ETF清单文件的响应,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
181///@param etf_info 查询到的ETF清单文件情况
182///@param error_info
183/// 查询ETF清单文件发生错误时返回的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
184///@param request_id 此消息响应函数对应的请求ID
185///@param is_last
186/// 此消息响应函数是否为request_id这条请求所对应的最后一个响应,当为最后一个的时候为true,如果为false,表示还有其他后续消息响应
187///@remark 需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
188void OnQueryETF(XTPQueryETFBaseRsp *etf_info, XTPRI *error_info, int request_id, bool is_last,
189 uint64_t session_id) override{};
190
191/// 请求查询ETF股票篮的响应,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
192///@param etf_component_info 查询到的ETF合约的相关成分股信息
193///@param error_info
194/// 查询ETF股票篮发生错误时返回的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
195///@param request_id 此消息响应函数对应的请求ID
196///@param is_last
197/// 此消息响应函数是否为request_id这条请求所对应的最后一个响应,当为最后一个的时候为true,如果为false,表示还有其他后续消息响应
198///@remark 需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
199void OnQueryETFBasket(XTPQueryETFComponentRsp *etf_component_info, XTPRI *error_info, int request_id, bool is_last,
200 uint64_t session_id) override{};
201
202/// 请求查询今日新股申购信息列表的响应,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
203///@param ipo_info 查询到的今日新股申购的一只股票信息
204///@param error_info
205/// 查询今日新股申购信息列表发生错误时返回的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
206///@param request_id 此消息响应函数对应的请求ID
207///@param is_last
208/// 此消息响应函数是否为request_id这条请求所对应的最后一个响应,当为最后一个的时候为true,如果为false,表示还有其他后续消息响应
209///@remark 需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
210void OnQueryIPOInfoList(XTPQueryIPOTickerRsp *ipo_info, XTPRI *error_info, int request_id, bool is_last,
211 uint64_t session_id) override{};
212
213/// 请求查询用户新股申购额度信息的响应,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
214///@param quota_info 查询到的用户某个市场的今日新股申购额度信息
215///@param error_info
216/// 查查询用户新股申购额度信息发生错误时返回的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
217///@param request_id 此消息响应函数对应的请求ID
218///@param is_last
219/// 此消息响应函数是否为request_id这条请求所对应的最后一个响应,当为最后一个的时候为true,如果为false,表示还有其他后续消息响应
220///@remark 需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
221void OnQueryIPOQuotaInfo(XTPQueryIPOQuotaRsp *quota_info, XTPRI *error_info, int request_id, bool is_last,
222 uint64_t session_id) override{};
223
224private:
225TDConfiguration config_{};
226XTP::API::TraderApi *api_{};
227uint64_t session_id_{};
228int request_id_{};
229int get_request_id() { return ++request_id_; }
230std::string trading_day_{};
231
232std::unordered_map<uint64_t, uint64_t> map_kf_to_xtp_order_id_{};
233std::unordered_map<uint64_t, uint64_t> map_xtp_to_kf_order_id_{};
234std::unordered_map<uint64_t, uint64_t> map_request_location_{};
235std::unordered_map<uint64_t, std::unordered_set<std::string>> map_xtp_order_id_to_xtp_trader_ids_{};
236std::unordered_map<uint64_t, std::vector<XTPTradeReport>> map_xtp_order_id_to_XTPTradeReports_{};
237std::unordered_map<uint64_t, int64_t> map_xtp_order_id_to_traded_volume_{};
238std::unordered_map<uint64_t, std::queue<uint64_t>> map_xtp_order_id_to_action_ids_{};
239
240yijinjing::journal::writer_ptr get_history_writer(uint64_t request_id);
241
242bool custom_OnCancelOrderError(const event_ptr &event);
243bool custom_OnOrderEvent(const event_ptr &event);
244bool custom_OnTradeEvent(const event_ptr &event);
245bool custom_OnQueryOrder(const event_ptr &event);
246bool custom_OnQueryTrade(const event_ptr &event);
247bool custom_OnQueryAsset(const event_ptr &event);
248bool custom_OnQueryPosition(const event_ptr &event);
249
250bool custom_OnCancelOrderError(const XTPOrderCancelInfo &cancel_info, const XTPRI &error_info, uint64_t session_id);
251bool custom_OnOrderEvent(const XTPOrderInfo &order_info, const XTPRI &error_info, uint64_t session_id);
252bool custom_OnTradeEvent(const XTPTradeReport &trade_info, uint64_t session_id);
253bool custom_OnQueryOrder(const XTPOrderInfo &order_info, const XTPRI &error_info, int request_id, bool is_last,
254 uint64_t session_id);
255bool custom_OnQueryTrade(const XTPTradeReport &trade_info, const XTPRI &error_info, int request_id, bool is_last,
256 uint64_t session_id);
257bool custom_OnQueryAsset(const XTPQueryAssetRsp &asset, const XTPRI &error_info, int request_id, bool is_last,
258 uint64_t session_id);
259bool custom_OnQueryPosition(const XTPQueryStkPositionRsp &position, const XTPRI &error_info, int request_id,
260 bool is_last, uint64_t session_id);
261
262void try_deal_XTPTradeReport(uint64_t xtp_order_id);
263bool generate_external_order(const XTPOrderInfo &order_info);
264
265void add_XTPTradeReport(const XTPTradeReport &trade_info);
266bool has_dealt_trade(uint64_t xtp_order_id, const std::string &exec_id);
267void add_dealt_trade(uint64_t xtp_order_id, const std::string &exec_id);
268
269void add_traded_volume(uint64_t order_xtp_id, int64_t trade_volume);
270int64_t get_traded_volume(uint64_t order_xtp_id);
271
272void add_action_id(uint64_t xtp_order_id, int64_t action_id);
273uint64_t get_action_id(uint64_t xtp_order_id);
274
275void req_order_trade();
276void try_ready();
277bool req_order_over_{false};
278bool req_trade_over_{false};
279};
280} // namespace kungfu::wingchun::xtp
281#endif // KUNGFU_XTP_EXT_TRADER_H
trader_xtp.cpp文件
xtp的交易柜台函数实现
1#include "trader_xtp.h"
2#include "buffer_data.h"
3#include "serialize_xtp.h"
4#include "type_convert.h"
5#include <algorithm>
6
7namespace kungfu::wingchun::xtp {
8using namespace kungfu::yijinjing::data;
9using namespace kungfu::yijinjing;
10
11TraderXTP::TraderXTP(broker::BrokerVendor &vendor) : Trader(vendor) {
12 KUNGFU_SETUP_LOG();
13 SPDLOG_DEBUG("arguments: {}", get_vendor().get_arguments());
14}
15
16TraderXTP::~TraderXTP() {
17 if (api_ != nullptr) {
18 api_->Release();
19 }
20}
21
22void TraderXTP::pre_start() {
23 config_ = nlohmann::json::parse(get_config());
24 SPDLOG_INFO("config: {}", get_config());
25 if (not config_.recover_order_trade) {
26 disable_recover();
27 }
28}
29
30void TraderXTP::on_start() {
31 if (config_.client_id < 1 or config_.client_id > 99) {
32 SPDLOG_ERROR("client_id must between 1 and 99");
33 }
34 std::string runtime_folder = get_runtime_folder();
35 SPDLOG_INFO("Connecting XTP account {} with tcp://{}:{}", config_.account_id, config_.td_ip, config_.td_port);
36 api_ = XTP::API::TraderApi::CreateTraderApi(config_.client_id, runtime_folder.c_str());
37 api_->RegisterSpi(this);
38 api_->SubscribePublicTopic(XTP_TERT_QUICK);
39 api_->SetSoftwareVersion("1.1.0");
40 api_->SetSoftwareKey(config_.software_key.c_str());
41 session_id_ = api_->Login(config_.td_ip.c_str(), config_.td_port, config_.account_id.c_str(),
42 config_.password.c_str(), XTP_PROTOCOL_TCP);
43 if (session_id_ > 0) {
44 SPDLOG_INFO("Login successfully");
45 req_order_trade();
46 } else {
47 update_broker_state(BrokerState::LoginFailed);
48 SPDLOG_ERROR("Login failed [{}]: {}", api_->GetApiLastError()->error_id, api_->GetApiLastError()->error_msg);
49 }
50}
51
52void TraderXTP::on_exit() {
53 if (api_ != nullptr and session_id_ > 0) {
54 auto result = api_->Logout(session_id_);
55 SPDLOG_INFO("Logout with return code {}", result);
56 }
57}
58
59bool TraderXTP::insert_order(const event_ptr &event) {
60 const OrderInput &input = event->data<OrderInput>();
61 SPDLOG_DEBUG("OrderInput: {}", input.to_string());
62 XTPOrderInsertInfo xtp_input = {};
63 to_xtp(xtp_input, input);
64
65 SPDLOG_DEBUG("XTPOrderInsertInfo: {}", to_string(xtp_input));
66 uint64_t order_xtp_id = api_->InsertOrder(&xtp_input, session_id_);
67 auto success = order_xtp_id != 0;
68
69 auto nano = yijinjing::time::now_in_nano();
70 auto writer = get_writer(event->source());
71 Order &order = writer->open_data<Order>(event->gen_time());
72 order_from_input(input, order);
73 order.external_order_id = std::to_string(order_xtp_id).c_str();
74 order.insert_time = nano;
75 order.update_time = nano;
76
77 if (success) {
78 map_kf_to_xtp_order_id_.emplace(uint64_t(input.order_id), order_xtp_id);
79 map_xtp_to_kf_order_id_.emplace(order_xtp_id, uint64_t(input.order_id));
80 } else {
81 auto error_info = api_->GetApiLastError();
82 order.error_id = error_info->error_id;
83 order.error_msg = error_info->error_msg;
84 order.status = OrderStatus::Error;
85 }
86
87 SPDLOG_DEBUG("Order: {}", order.to_string());
88 writer->close_data();
89 if (not success) {
90 SPDLOG_ERROR("fail to insert order {}, error id {}, {}", to_string(xtp_input), (int)order.error_id,
91 order.error_msg);
92 }
93 return success;
94}
95
96bool TraderXTP::cancel_order(const event_ptr &event) {
97 const OrderAction &action = event->data<OrderAction>();
98 SPDLOG_DEBUG("OrderAction: {}", action.to_string());
99 auto order_id_iter = map_kf_to_xtp_order_id_.find(action.order_id);
100 if (order_id_iter == map_kf_to_xtp_order_id_.end()) {
101 SPDLOG_ERROR("failed to cancel order {}, can't find related xtp order id", action.order_id);
102 return false;
103 }
104
105 if (not has_order(action.order_id)) {
106 SPDLOG_ERROR("no order_id {} in orders_", action.order_id);
107 return false;
108 }
109
110 auto &order_state = get_order(action.order_id);
111 uint64_t order_xtp_id = order_id_iter->second;
112 add_action_id(order_xtp_id, action.order_action_id);
113 auto xtp_action_id = api_->CancelOrder(order_xtp_id, session_id_);
114 auto success = xtp_action_id != 0;
115
116 if (not success) {
117 XTPRI *error_info = api_->GetApiLastError();
118 SPDLOG_ERROR("failed to cancel order {}, order_xtp_id: {} session_id: {} error_id: {} error_msg: {}",
119 action.order_id, order_xtp_id, session_id_, error_info->error_id, error_info->error_msg);
120 OrderActionError &error = get_writer(event->source())->open_data<OrderActionError>(now());
121 error.order_id = action.order_id; // 订单ID
122 std::string str_external_order_id = std::to_string(order_xtp_id);
123 error.external_order_id = str_external_order_id.c_str();
124 error.order_action_id = action.order_action_id; // 订单操作ID,
125 error.error_id = xtp_action_id; // 错误ID
126 error.error_msg = error_info->error_msg; // 错误信息
127 error.insert_time = time::now_in_nano(); // 写入时间
128 SPDLOG_DEBUG("OrderActionError: {}", error.to_string());
129 get_writer(event->source())->close_data();
130 return false;
131 }
132
133 if (not is_final_status(order_state.data.status) or order_state.data.status == OrderStatus::Lost) {
134 order_state.data.status = OrderStatus::Cancelling;
135 try_write_to(order_state.data, order_state.dest);
136 }
137 SPDLOG_DEBUG("Order: {}", order_state.data.to_string());
138 return success;
139}
140
141bool TraderXTP::req_position() {
142 SPDLOG_INFO("req_position");
143 return api_->QueryPosition(nullptr, session_id_, get_request_id()) == 0;
144}
145
146bool TraderXTP::req_account() {
147 SPDLOG_INFO("req_account");
148 return api_->QueryAsset(session_id_, get_request_id()) == 0;
149}
150
151void TraderXTP::OnDisconnected(uint64_t session_id, int reason) {
152 if (session_id == session_id_) {
153 update_broker_state(BrokerState::DisConnected);
154 SPDLOG_ERROR("disconnected, reason: {}", reason);
155 }
156}
157
158void TraderXTP::OnOrderEvent(XTPOrderInfo *order_info, XTPRI *error_info, uint64_t session_id) {
159 if (nullptr == order_info) {
160 SPDLOG_ERROR("XTPOrderInfo is nullptr");
161 return;
162 }
163 SPDLOG_DEBUG("XTPOrderInfo: {}", to_string(*order_info));
164 auto &bf_order_info = get_thread_writer()->open_custom_data<BufferXTPOrderInfo>(kXTPOrderInfoType, now());
165 memcpy(&bf_order_info.order_info, order_info, sizeof(XTPOrderInfo));
166 bf_order_info.session_id = session_id;
167 if (error_info != nullptr) {
168 memcpy(&bf_order_info.error_info, error_info, sizeof(XTPRI));
169 } else {
170 memset(&bf_order_info.error_info, 0, sizeof(XTPRI));
171 }
172 SPDLOG_DEBUG("BufferXTPOrderInfo: {}", to_string(bf_order_info));
173 get_thread_writer()->close_data();
174}
175
176bool TraderXTP::custom_OnOrderEvent(const event_ptr &event) {
177 const auto *bf_order_info = reinterpret_cast<const BufferXTPOrderInfo *>(event->data_address());
178 return custom_OnOrderEvent(bf_order_info->order_info, bf_order_info->error_info, bf_order_info->session_id);
179}
180
181bool TraderXTP::custom_OnOrderEvent(const XTPOrderInfo &order_info, const XTPRI &error_info, uint64_t session_id) {
182 SPDLOG_DEBUG("XTPOrderInfo: {}", to_string(order_info));
183 SPDLOG_DEBUG("session_id: {}, XTPRI: {}", session_id, to_string(error_info));
184
185 auto order_xtp_id_iter = map_xtp_to_kf_order_id_.find(order_info.order_xtp_id);
186 if (order_xtp_id_iter == map_xtp_to_kf_order_id_.end()) {
187 SPDLOG_WARN("unrecognized order_xtp_id {}@{}", order_info.order_xtp_id, trading_day_);
188 return generate_external_order(order_info);
189 }
190
191 uint64_t kf_order_id = order_xtp_id_iter->second;
192 if (not has_order(kf_order_id)) {
193 return generate_external_order(order_info);
194 }
195
196 auto &order_state = get_order(kf_order_id);
197 if (not is_final_status(order_state.data.status) or order_state.data.status == OrderStatus::Lost) {
198 from_xtp_no_price_type(order_info, order_state.data);
199 order_state.data.update_time = yijinjing::time::now_in_nano();
200 if (error_info.error_id != 0) {
201 order_state.data.error_id = error_info.error_id;
202 order_state.data.error_msg = error_info.error_msg;
203 }
204 try_write_to(order_state.data, order_state.dest);
205 SPDLOG_DEBUG("Order: {}", order_state.data.to_string());
206 try_deal_XTPTradeReport(order_info.order_xtp_id);
207 }
208 return true;
209}
210
211bool TraderXTP::generate_external_order(const XTPOrderInfo &order_info) {
212 SPDLOG_DEBUG("XTPOrderInfo: {}", to_string(order_info));
213 static const std::unordered_set<int> set_cancel_enum = {
214 XTP_ORDER_SUBMIT_STATUS_TYPE::XTP_ORDER_SUBMIT_STATUS_CANCEL_SUBMITTED, //
215 XTP_ORDER_SUBMIT_STATUS_TYPE::XTP_ORDER_SUBMIT_STATUS_CANCEL_REJECTED, //
216 XTP_ORDER_SUBMIT_STATUS_TYPE::XTP_ORDER_SUBMIT_STATUS_CANCEL_ACCEPTED //
217 };
218
219 if (not config_.sync_external_order) {
220 return false;
221 }
222
223 if (set_cancel_enum.find(order_info.order_submit_status) != set_cancel_enum.end()) {
224 SPDLOG_DEBUG("this XTPOrderInfo is xtp cancel order, do not generate kungfu Order");
225 return false;
226 }
227
228 auto writer = get_public_writer();
229 auto nano = yijinjing::time::now_in_nano();
230 Order &order = writer->open_data<Order>(now());
231 order.order_id = writer->current_frame_uid();
232 from_xtp(order_info, order);
233 order.insert_time = nsec_from_xtp_timestamp(order_info.insert_time);
234 order.update_time = nano;
235 map_kf_to_xtp_order_id_.emplace(uint64_t(order.order_id), order_info.order_xtp_id);
236 map_xtp_to_kf_order_id_.emplace(order_info.order_xtp_id, uint64_t(order.order_id));
237 SPDLOG_DEBUG("Order: {}", order.to_string());
238 writer->close_data();
239 try_deal_XTPTradeReport(order_info.order_xtp_id);
240 return true;
241}
242
243void TraderXTP::OnTradeEvent(XTPTradeReport *trade_info, uint64_t session_id) {
244 if (nullptr == trade_info) {
245 SPDLOG_ERROR("XTPTradeReport is nullptr");
246 return;
247 }
248 SPDLOG_DEBUG("XTPTradeReport: {}", to_string(*trade_info));
249
250 auto &bf_trade_info = get_thread_writer()->open_custom_data<BufferXTPTradeReport>(kXTPTradeReportType, now());
251 memcpy(&bf_trade_info.trade_info, trade_info, sizeof(XTPTradeReport));
252 bf_trade_info.session_id = session_id;
253 SPDLOG_DEBUG("BufferXTPOrderInfo: {}", to_string(bf_trade_info));
254 get_thread_writer()->close_data();
255}
256
257bool TraderXTP::custom_OnTradeEvent(const XTPTradeReport &trade_info, uint64_t session_id) {
258 SPDLOG_DEBUG("XTPTradeReport: {}", to_string(trade_info));
259 SPDLOG_DEBUG("session_id: {}", session_id);
260
261 auto order_xtp_id_iter = map_xtp_to_kf_order_id_.find(trade_info.order_xtp_id);
262 if (order_xtp_id_iter == map_xtp_to_kf_order_id_.end()) {
263 SPDLOG_WARN("unrecognized order_xtp_id {}, store in map_xtp_order_id_to_XTPTradeReports_",
264 trade_info.order_xtp_id);
265 add_XTPTradeReport(trade_info);
266 return false;
267 }
268
269 if (has_dealt_trade(trade_info.order_xtp_id, trade_info.exec_id)) {
270 SPDLOG_DEBUG("order_xtp_id:{}, exec_id: {}, has dealt", trade_info.order_xtp_id, trade_info.exec_id);
271 return false;
272 }
273
274 uint64_t kf_order_id = order_xtp_id_iter->second;
275 if (not has_order(kf_order_id)) {
276 SPDLOG_ERROR("no order_id {} in orders_", kf_order_id);
277 return false;
278 }
279
280 add_dealt_trade(trade_info.order_xtp_id, trade_info.exec_id);
281 auto &order_state = get_order(kf_order_id);
282
283 if (has_writer(order_state.dest)) {
284 auto writer = get_writer(order_state.dest);
285 Trade &trade = writer->open_data<Trade>(now());
286 from_xtp(trade_info, trade);
287 trade.trade_id = writer->current_frame_uid();
288 trade.order_id = kf_order_id;
289 add_traded_volume(trade_info.order_xtp_id, trade.volume);
290 SPDLOG_DEBUG("Trade: {}", trade.to_string());
291 writer->close_data();
292 } else {
293 Trade trade{};
294 from_xtp(trade_info, trade);
295 trade.trade_id = get_public_writer()->current_frame_uid() xor (time::now_in_nano() & 0xFFFFFFFF);
296 trade.order_id = kf_order_id;
297 add_traded_volume(trade_info.order_xtp_id, trade.volume);
298 SPDLOG_DEBUG("Trade: {}", trade.to_string());
299 try_write_to(trade, order_state.dest);
300 }
301
302 if (not is_final_status(order_state.data.status) or order_state.data.status == OrderStatus::Lost) {
303 order_state.data.volume_left = std::min<int64_t>(
304 order_state.data.volume_left, order_state.data.volume - get_traded_volume(trade_info.order_xtp_id));
305 if (order_state.data.volume_left > 0) {
306 order_state.data.status = OrderStatus::PartialFilledActive;
307 }
308 order_state.data.update_time = now();
309 SPDLOG_DEBUG("Order: {}", order_state.data.to_string());
310 try_write_to(order_state.data, order_state.dest);
311 }
312 return true;
313}
314
315bool TraderXTP::custom_OnTradeEvent(const event_ptr &event) {
316 const auto *bf_trade_info = reinterpret_cast<const BufferXTPTradeReport *>(event->data_address());
317 return custom_OnTradeEvent(bf_trade_info->trade_info, bf_trade_info->session_id);
318}
319
320void TraderXTP::OnCancelOrderError(XTPOrderCancelInfo *cancel_info, XTPRI *error_info, uint64_t session_id) {
321 if (nullptr == cancel_info) {
322 SPDLOG_ERROR("XTPOrderCancelInfo is nullptr");
323 return;
324 }
325 SPDLOG_ERROR("XTPOrderCancelInfo: {}", to_string(*cancel_info));
326
327 auto &bf_order_cancel_info =
328 get_thread_writer()->open_custom_data<BufferXTPOrderCancelInfo>(kCancelOrderErrorType, now());
329 memcpy(&bf_order_cancel_info.cancel_info, cancel_info, sizeof(XTPOrderCancelInfo));
330 bf_order_cancel_info.session_id = session_id;
331 if (error_info != nullptr) {
332 memcpy(&bf_order_cancel_info.error_info, error_info, sizeof(XTPRI));
333 } else {
334 memset(&bf_order_cancel_info.error_info, 0, sizeof(XTPRI));
335 }
336 SPDLOG_DEBUG("BufferXTPOrderInfo: {}", to_string(bf_order_cancel_info));
337 get_thread_writer()->close_data();
338}
339
340bool TraderXTP::custom_OnCancelOrderError(const event_ptr &event) {
341 const auto &bf_order_cancel_info = event->custom_data<BufferXTPOrderCancelInfo>();
342 return custom_OnCancelOrderError(bf_order_cancel_info.cancel_info, bf_order_cancel_info.error_info,
343 bf_order_cancel_info.session_id);
344}
345
346bool TraderXTP::custom_OnCancelOrderError(const XTPOrderCancelInfo &cancel_info, const XTPRI &error_info,
347 uint64_t session_id) {
348 SPDLOG_DEBUG("XTPOrderCancelInfo: {}", to_string(cancel_info));
349 SPDLOG_DEBUG("session_id: {}, XTPRI: {}", session_id, to_string(error_info));
350
351 uint64_t action_id = get_action_id(cancel_info.order_xtp_id);
352 if (not has_order_action(action_id)) {
353 SPDLOG_WARN("has not related OrderAction of {}:{}", cancel_info.order_xtp_id, action_id);
354 return false;
355 }
356
357 auto action_state = get_order_action(action_id);
358 auto order_id = action_state.data.order_id;
359 if (not has_order(order_id)) {
360 SPDLOG_WARN("order_id not in orders_ {}", order_id);
361 return false;
362 }
363
364 auto order_state = get_order(order_id);
365 if (has_writer(order_state.dest)) {
366 OrderActionError &error = get_writer(order_state.dest)->open_data<OrderActionError>(now());
367 error.order_id = order_state.data.order_id; // 订单ID
368 std::string str_external_order_id = std::to_string(cancel_info.order_xtp_id);
369 error.external_order_id = str_external_order_id.c_str();
370 error.order_action_id = action_id; // 订单操作ID,
371 error.error_id = error_info.error_id; // 错误ID
372 error.error_msg = error_info.error_msg; // 错误信息
373 error.insert_time = time::now_in_nano(); // 写入时间
374 SPDLOG_DEBUG("OrderActionError: {}", error.to_string());
375 get_writer(order_state.dest)->close_data();
376 } else {
377 OrderActionError error{};
378 error.order_id = order_state.data.order_id; // 订单ID
379 std::string str_external_order_id = std::to_string(cancel_info.order_xtp_id);
380 error.external_order_id = str_external_order_id.c_str();
381 error.order_action_id = action_id; // 订单操作ID,
382 error.error_id = error_info.error_id; // 错误ID
383 error.error_msg = error_info.error_msg; // 错误信息
384 error.insert_time = time::now_in_nano(); // 写入时间
385 SPDLOG_DEBUG("OrderActionError: {}", error.to_string());
386 try_write_to(error, order_state.dest);
387 }
388 return true;
389}
390
391void TraderXTP::OnQueryPosition(XTPQueryStkPositionRsp *position, XTPRI *error_info, int request_id, bool is_last,
392 uint64_t session_id) {
393 if (nullptr == position) {
394 SPDLOG_ERROR("XTPQueryStkPositionRsp is nullptr");
395 return;
396 }
397 SPDLOG_TRACE("XTPQueryStkPositionRsp: {}", to_string(*position));
398
399 auto &bf_position = get_thread_writer()->open_custom_data<BufferXTPQueryStkPositionRsp>(kQueryPositionType);
400 memcpy(&bf_position.position, position, sizeof(XTPQueryStkPositionRsp));
401 if (error_info != nullptr) {
402 memcpy(&bf_position.error_info, error_info, sizeof(XTPRI));
403 } else {
404 memset(&bf_position.error_info, 0, sizeof(XTPRI));
405 }
406 bf_position.request_id = request_id;
407 bf_position.is_last = is_last;
408 bf_position.session_id = session_id;
409 get_thread_writer()->close_data();
410}
411
412bool TraderXTP::custom_OnQueryPosition(const event_ptr &event) {
413 const auto &bf_position = event->custom_data<BufferXTPQueryStkPositionRsp>();
414 return custom_OnQueryPosition(bf_position.position, bf_position.error_info, bf_position.request_id,
415 bf_position.is_last, bf_position.session_id);
416}
417
418bool TraderXTP::custom_OnQueryPosition(const XTPQueryStkPositionRsp &position, const XTPRI &error_info, int request_id,
419 bool is_last, uint64_t session_id) {
420 if (error_info.error_id != 0) {
421 SPDLOG_ERROR("error_id:{}, error_msg: {}, request_id: {}, last: {}", error_info.error_id, error_info.error_msg,
422 request_id, is_last);
423 return false;
424 }
425
426 SPDLOG_TRACE("XTPQueryStkPositionRsp: {}", to_string(position));
427 auto writer = get_position_writer();
428 Position &stock_pos = writer->open_data<Position>(0);
429 from_xtp(position, stock_pos);
430 stock_pos.holder_uid = get_home_uid();
431 stock_pos.source_id = get_home_uid();
432 stock_pos.instrument_type = get_instrument_type(stock_pos.exchange_id, stock_pos.instrument_id);
433 stock_pos.direction = Direction::Long;
434 stock_pos.update_time = yijinjing::time::now_in_nano();
435 SPDLOG_TRACE("Position: {}", stock_pos.to_string());
436 writer->close_data();
437 if (is_last) {
438 PositionEnd &end = writer->open_data<PositionEnd>(0);
439 end.holder_uid = get_home_uid();
440 writer->close_data();
441 enable_positions_sync();
442 }
443 return true;
444}
445
446void TraderXTP::OnQueryAsset(XTPQueryAssetRsp *asset, XTPRI *error_info, int request_id, bool is_last,
447 uint64_t session_id) {
448 if (nullptr == asset) {
449 SPDLOG_ERROR("XTPQueryAssetRsp is nullptr");
450 return;
451 }
452 SPDLOG_TRACE("XTPQueryAssetRsp: {}", to_string(*asset));
453
454 auto &bf_asset = get_thread_writer()->open_custom_data<BufferXTPQueryAssetRsp>(kQueryAssetType);
455 memcpy(&bf_asset.asset, asset, sizeof(XTPQueryAssetRsp));
456 if (error_info != nullptr) {
457 memcpy(&bf_asset.error_info, error_info, sizeof(XTPRI));
458 } else {
459 memset(&bf_asset.error_info, 0, sizeof(XTPRI));
460 }
461 bf_asset.request_id = request_id;
462 bf_asset.is_last = is_last;
463 bf_asset.session_id = session_id;
464 get_thread_writer()->close_data();
465}
466
467bool TraderXTP::custom_OnQueryAsset(const event_ptr &event) {
468 const auto &bf_asset = event->custom_data<BufferXTPQueryAssetRsp>();
469 return custom_OnQueryAsset(bf_asset.asset, bf_asset.error_info, bf_asset.request_id, bf_asset.is_last,
470 bf_asset.session_id);
471}
472
473bool TraderXTP::custom_OnQueryAsset(const XTPQueryAssetRsp &asset, const XTPRI &error_info, int request_id,
474 bool is_last, uint64_t session_id) {
475 if (error_info.error_id != 0) {
476 SPDLOG_ERROR("error_id: {}, error_msg: {}, request_id: {}, last: {}", error_info.error_id, error_info.error_msg,
477 request_id, is_last);
478 }
479
480 if (error_info.error_id == 0 || error_info.error_id == 11000350) {
481 SPDLOG_TRACE("OnQueryAsset: {}", to_string(asset));
482 auto writer = get_asset_writer();
483 Asset &account = writer->open_data<Asset>(0);
484 if (error_info.error_id == 0) {
485 from_xtp(asset, account);
486 }
487 account.holder_uid = get_live_home_uid();
488 account.update_time = yijinjing::time::now_in_nano();
489 SPDLOG_TRACE("Asset: {}", account.to_string());
490 writer->close_data();
491 enable_asset_sync();
492 }
493 return true;
494}
495
496bool TraderXTP::req_history_order(const event_ptr &event) {
497 XTPQueryOrderReq query_param{};
498 int request_id = get_request_id();
499 int ret = api_->QueryOrders(&query_param, session_id_, request_id);
500 if (0 != ret) {
501 SPDLOG_ERROR("QueryOrders False: {}", ret);
502 }
503 map_request_location_.emplace(request_id, event->source());
504 return 0 == ret;
505}
506
507bool TraderXTP::req_history_trade(const event_ptr &event) {
508 XTPQueryTraderReq query_param{};
509 int request_id = get_request_id();
510 int ret = api_->QueryTrades(&query_param, session_id_, request_id);
511 if (0 != ret) {
512 SPDLOG_ERROR("QueryTrades False : {}", ret);
513 }
514 map_request_location_.emplace(request_id, event->source());
515 return 0 == ret;
516}
517
518void TraderXTP::OnQueryOrder(XTPQueryOrderRsp *order_info, XTPRI *error_info, int request_id, bool is_last,
519 uint64_t session_id) {
520 SPDLOG_DEBUG("request_id: {}, is_last: {}, session_id: {}", request_id, is_last, session_id);
521 auto &bf_order_info = get_thread_writer()->open_custom_data<BufferXTPOrderInfo>(kQueryXTPOrderInfoType, now());
522 if (order_info != nullptr) {
523 memcpy(&bf_order_info.order_info, order_info, sizeof(XTPOrderInfo));
524 } else {
525 memset(&bf_order_info.order_info, 0, sizeof(XTPOrderInfo));
526 }
527 if (error_info != nullptr) {
528 memcpy(&bf_order_info.error_info, error_info, sizeof(XTPRI));
529 } else {
530 memset(&bf_order_info.error_info, 0, sizeof(XTPRI));
531 }
532 bf_order_info.session_id = session_id;
533 bf_order_info.request_id = request_id;
534 bf_order_info.is_last = is_last;
535 SPDLOG_DEBUG("BufferXTPOrderInfo: {}", to_string(bf_order_info));
536 get_thread_writer()->close_frame(sizeof(BufferXTPOrderInfo));
537}
538
539bool TraderXTP::custom_OnQueryOrder(const event_ptr &event) {
540 const auto *bf_order_info = reinterpret_cast<const BufferXTPOrderInfo *>(event->data_address());
541 return custom_OnQueryOrder(bf_order_info->order_info, bf_order_info->error_info, bf_order_info->request_id,
542 bf_order_info->is_last, bf_order_info->session_id);
543}
544
545bool TraderXTP::custom_OnQueryOrder(const XTPOrderInfo &order_info, const XTPRI &error_info, int request_id,
546 bool is_last, uint64_t session_id) {
547 SPDLOG_DEBUG("XTPOrderInfo: {}", to_string(order_info));
548 SPDLOG_DEBUG("XTPRI: {}", to_string(error_info));
549 SPDLOG_DEBUG("request_id: {}, is_last: {}", request_id, is_last);
550
551 // 查询历史流水收到nullptr, 经过journal走一圈后表现形式为 order_xtp_id == 0
552 if (order_info.order_xtp_id == 0 and is_last and
553 map_request_location_.find(request_id) != map_request_location_.end()) {
554 SPDLOG_WARN("XTPQueryOrderRsp* order_info == nullptr, no data returned!");
555 auto writer = get_history_writer(request_id);
556 HistoryOrder &history_order = writer->open_data<HistoryOrder>();
557 history_order.is_last = true;
558 history_order.data_type = HistoryDataType::TotalEnd;
559 const std::string msg = "No order today";
560 history_order.error_msg = msg.c_str();
561 writer->close_data();
562 SPDLOG_DEBUG("HistoryOrder: {}", history_order.to_string());
563 return false;
564 }
565
566 if (map_request_location_.find(request_id) == map_request_location_.end()) {
567 // TD重连收到推送当做普通下单委托响应处理
568 if (is_last) {
569 req_order_over_ = true;
570 try_ready();
571 }
572 return order_info.order_xtp_id != 0 and custom_OnOrderEvent(order_info, error_info, request_id);
573 }
574
575 auto writer = get_history_writer(request_id);
576 HistoryOrder &history_order = writer->open_data<HistoryOrder>();
577
578 if (error_info.error_id != 0) {
579 SPDLOG_ERROR("OnQueryOrder False , error_code : {}, error_msg : {}", error_info.error_id, error_info.error_msg);
580 history_order.error_id = error_info.error_id;
581 history_order.error_msg = error_info.error_msg;
582 }
583
584 from_xtp(order_info, history_order);
585 history_order.order_id = writer->current_frame_uid();
586 history_order.is_last = is_last;
587 history_order.insert_time = yijinjing::time::now_in_nano();
588 history_order.update_time = history_order.insert_time;
589 SPDLOG_DEBUG("HistoryOrder: {}", history_order.to_string());
590 writer->close_data();
591 return true;
592}
593
594yijinjing::journal::writer_ptr TraderXTP::get_history_writer(uint64_t request_id) {
595 return get_writer(map_request_location_.try_emplace(request_id).first->second);
596}
597
598void TraderXTP::OnQueryTrade(XTPQueryTradeRsp *trade_info, XTPRI *error_info, int request_id, bool is_last,
599 uint64_t session_id) {
600 SPDLOG_DEBUG("request_id: {}, is_last: {}, session_id: {}", request_id, is_last, session_id);
601 auto &bf_trade_info = get_thread_writer()->open_custom_data<BufferXTPTradeReport>(kQueryXTPTradeReportType, now());
602 if (trade_info != nullptr) {
603 memcpy(&bf_trade_info.trade_info, trade_info, sizeof(XTPOrderInfo));
604 } else {
605 memset(&bf_trade_info.trade_info, 0, sizeof(XTPOrderInfo));
606 }
607 if (error_info != nullptr) {
608 memcpy(&bf_trade_info.error_info, error_info, sizeof(XTPRI));
609 } else {
610 memset(&bf_trade_info.error_info, 0, sizeof(XTPRI));
611 }
612 bf_trade_info.session_id = session_id;
613 bf_trade_info.request_id = request_id;
614 bf_trade_info.is_last = is_last;
615 SPDLOG_DEBUG("BufferXTPTradeReport: {}", to_string(bf_trade_info));
616 get_thread_writer()->close_data();
617}
618
619bool TraderXTP::custom_OnQueryTrade(const event_ptr &event) {
620 const auto *bf_trade_info = reinterpret_cast<const BufferXTPTradeReport *>(event->data_address());
621 return custom_OnQueryTrade(bf_trade_info->trade_info, bf_trade_info->error_info, bf_trade_info->request_id,
622 bf_trade_info->is_last, bf_trade_info->session_id);
623}
624
625bool TraderXTP::custom_OnQueryTrade(const XTPTradeReport &trade_info, const XTPRI &error_info, int request_id,
626 bool is_last, uint64_t session_id) {
627 SPDLOG_DEBUG("XTPTradeReport: {}", to_string(trade_info));
628 SPDLOG_DEBUG("XTPRI: {}", to_string(error_info));
629 SPDLOG_DEBUG("request_id: {}, is_last: {}", request_id, is_last);
630
631 // 查询历史流水收到nullptr, 经过journal走一圈后表现形式为 order_xtp_id == 0
632 if (trade_info.order_xtp_id == 0 and is_last and
633 map_request_location_.find(request_id) != map_request_location_.end()) {
634 SPDLOG_WARN("XTPQueryTradeRsp* trade_info == nullptr, no data returned!");
635 auto writer = get_history_writer(request_id);
636 HistoryTrade &history_trade = writer->open_data<HistoryTrade>(now());
637 history_trade.is_last = true;
638 history_trade.data_type = HistoryDataType::TotalEnd;
639 const std::string msg = "No trade today";
640 history_trade.error_msg = msg.c_str();
641 writer->close_data();
642 return false;
643 }
644
645 if (map_request_location_.find(request_id) == map_request_location_.end()) {
646 // TD重连收到推送当做普通交易成交回报推送处理
647 if (is_last) {
648 req_trade_over_ = true;
649 try_ready();
650 }
651 return trade_info.order_xtp_id != 0 and custom_OnTradeEvent(trade_info, session_id);
652 }
653
654 auto writer = get_history_writer(request_id);
655 HistoryTrade &history_trade = writer->open_data<HistoryTrade>(now());
656
657 if (error_info.error_id != 0) {
658 SPDLOG_ERROR("OnQueryTrade False , error_code : {}, error_msg : {}", error_info.error_id, error_info.error_msg);
659 history_trade.error_id = error_info.error_id;
660 history_trade.error_msg = error_info.error_msg;
661 }
662
663 from_xtp(trade_info, history_trade);
664 history_trade.trade_id = writer->current_frame_uid();
665 history_trade.is_last = is_last;
666 history_trade.trade_time = yijinjing::time::now_in_nano();
667 history_trade.instrument_type = get_instrument_type(history_trade.exchange_id, history_trade.instrument_id);
668 SPDLOG_DEBUG("HistoryTrade: {}", history_trade.to_string());
669 writer->close_data();
670 return false;
671}
672
673void TraderXTP::on_recover() {
674 for (auto &pair : get_orders()) {
675 SPDLOG_DEBUG("Order: {}", pair.second.data.to_string());
676 const std::string str_external_order_id = pair.second.data.external_order_id.to_string();
677 if (not str_external_order_id.empty()) {
678 uint64_t order_id = pair.first;
679 uint64_t order_xtp_id = std::stoull(str_external_order_id);
680 map_xtp_to_kf_order_id_.emplace(order_xtp_id, order_id);
681 map_kf_to_xtp_order_id_.emplace(order_id, order_xtp_id);
682 }
683 }
684 for (auto &pair : get_trades()) {
685 SPDLOG_DEBUG("Trade: {}", pair.second.data.to_string());
686 uint64_t order_xtp_id = std::stoull(pair.second.data.external_order_id);
687 map_xtp_order_id_to_xtp_trader_ids_.try_emplace(order_xtp_id)
688 .first->second.emplace(pair.second.data.external_trade_id.to_string());
689 }
690}
691
692void TraderXTP::req_order_trade() {
693 if (disable_recover_) {
694 return try_ready();
695 }
696
697 XTPQueryOrderReq query_order_param{};
698 int ret = api_->QueryOrders(&query_order_param, session_id_, get_request_id());
699 if (0 != ret) {
700 SPDLOG_ERROR("QueryOrders False: {}", ret);
701 }
702
703 XTPQueryTraderReq query_trade_param{};
704 ret = api_->QueryTrades(&query_trade_param, session_id_, get_request_id());
705 if (0 != ret) {
706 SPDLOG_ERROR("QueryTrades False : {}", ret);
707 }
708}
709
710void TraderXTP::try_ready() {
711 if (BrokerState::Ready == get_state()) {
712 return;
713 }
714
715 SPDLOG_DEBUG("req_order_over_: {}, req_trade_over_: {}", req_order_over_, req_trade_over_);
716 if (disable_recover_ or (req_order_over_ and req_trade_over_)) {
717 update_broker_state(BrokerState::Ready);
718 }
719}
720
721void TraderXTP::try_deal_XTPTradeReport(uint64_t xtp_order_id) {
722 auto &xtp_trades = map_xtp_order_id_to_XTPTradeReports_.try_emplace(xtp_order_id).first->second;
723 for (const auto &xtp_trade : xtp_trades) {
724 custom_OnTradeEvent(xtp_trade, session_id_);
725 }
726 xtp_trades.clear();
727}
728
729void TraderXTP::add_XTPTradeReport(const XTPTradeReport &trade_info) {
730 map_xtp_order_id_to_XTPTradeReports_.try_emplace(trade_info.order_xtp_id).first->second.push_back(trade_info);
731}
732
733bool TraderXTP::has_dealt_trade(uint64_t xtp_order_id, const std::string &exec_id) {
734 auto &exec_ids = map_xtp_order_id_to_xtp_trader_ids_.try_emplace(xtp_order_id).first->second;
735 return exec_ids.find(exec_id) != exec_ids.end();
736}
737
738void TraderXTP::add_dealt_trade(uint64_t xtp_order_id, const std::string &exec_id) {
739 map_xtp_order_id_to_xtp_trader_ids_.try_emplace(xtp_order_id).first->second.emplace(exec_id);
740}
741
742bool TraderXTP::on_custom_event(const event_ptr &event) {
743 SPDLOG_DEBUG("msg_type: {}", event->msg_type());
744 switch (event->msg_type()) {
745 case kXTPOrderInfoType:
746 return custom_OnOrderEvent(event);
747 case kXTPTradeReportType:
748 return custom_OnTradeEvent(event);
749 case kQueryXTPOrderInfoType:
750 return custom_OnQueryOrder(event);
751 case kQueryXTPTradeReportType:
752 return custom_OnQueryTrade(event);
753 case kCancelOrderErrorType:
754 return custom_OnCancelOrderError(event);
755 case kQueryAssetType:
756 return custom_OnQueryAsset(event);
757 case kQueryPositionType:
758 return custom_OnQueryPosition(event);
759 default:
760 SPDLOG_ERROR("unrecognized msg_type: {}", event->msg_type());
761 return false;
762 }
763}
764
765void TraderXTP::add_traded_volume(uint64_t order_xtp_id, int64_t trade_volume) {
766 map_xtp_order_id_to_traded_volume_.try_emplace(order_xtp_id).first->second += trade_volume;
767}
768
769int64_t TraderXTP::get_traded_volume(uint64_t order_xtp_id) {
770 return map_xtp_order_id_to_traded_volume_.try_emplace(order_xtp_id).first->second;
771}
772
773void TraderXTP::add_action_id(uint64_t xtp_order_id, int64_t action_id) {
774 map_xtp_order_id_to_action_ids_.try_emplace(xtp_order_id).first->second.push(action_id);
775}
776
777uint64_t TraderXTP::get_action_id(uint64_t xtp_order_id) {
778 auto &action_ids = map_xtp_order_id_to_action_ids_.try_emplace(xtp_order_id).first->second;
779 if (not action_ids.empty()) {
780 uint64_t action_id = action_ids.front();
781 action_ids.pop();
782 SPDLOG_DEBUG("xtp_order_id:action_id = {}:{}", xtp_order_id, action_id);
783 return action_id;
784 } else {
785 SPDLOG_ERROR("action_ids is empty");
786 return 0;
787 }
788}
789
790} // namespace kungfu::wingchun::xtp
marketdata_xtp.h文件
xtp的行情源头文件定义
1#ifndef KUNGFU_XTP_EXT_MARKET_DATA_H
2#define KUNGFU_XTP_EXT_MARKET_DATA_H
3
4#include <kungfu/wingchun/broker/marketdata.h>
5#include <kungfu/yijinjing/common.h>
6#include <xtp_quote_api.h>
7
8namespace kungfu::wingchun::xtp {
9class MarketDataXTP : public XTP::API::QuoteSpi, public broker::MarketData {
10public:
11 explicit MarketDataXTP(broker::BrokerVendor &vendor);
12
13 ~MarketDataXTP() override;
14
15 bool subscribe(const std::vector<longfist::types::InstrumentKey> &instrument_keys) override;
16
17 bool subscribe_all() override;
18 bool subscribe_custom(const longfist::types::CustomSubscribe &custom_sub) override;
19 bool unsubscribe(const std::vector<longfist::types::InstrumentKey> &instrument_keys) override { return false; };
20
21 /// 当客户端与行情后台通信连接断开时,该方法被调用。
22 ///@param reason 错误原因,请与错误代码表对应
23 ///@remark
24 /// api不会自动重连,当断线发生时,请用户自行选择后续操作。可以在此函数中调用Login重新登录。注意用户重新登录后,需要重新订阅行情
25 void OnDisconnected(int reason) override;
26
27 /// 订阅行情应答,包括股票、指数和期权
28 ///@param ticker 详细的合约订阅情况
29 ///@param error_info 订阅合约发生错误时的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
30 ///@param is_last 是否此次订阅的最后一个应答,当为最后一个的时候为true,如果为false,表示还有其他后续消息响应
31 ///@remark 每条订阅的合约均对应一条订阅应答,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
32 void OnSubMarketData(XTPST *ticker, XTPRI *error_info, bool is_last) override;
33
34 /// 深度行情通知,包含买一卖一队列
35 ///@param market_data 行情数据
36 ///@param bid1_qty 买一队列数据
37 ///@param bid1_count 买一队列的有效委托笔数
38 ///@param max_bid1_count 买一队列总委托笔数
39 ///@param ask1_qty 卖一队列数据
40 ///@param ask1_count 卖一队列的有效委托笔数
41 ///@param max_ask1_count 卖一队列总委托笔数
42 ///@remark 需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
43 void OnDepthMarketData(XTPMD *market_data, int64_t bid1_qty[], int32_t bid1_count, int32_t max_bid1_count,
44 int64_t ask1_qty[], int32_t ask1_count, int32_t max_ask1_count) override;
45
46 /// 订阅逐笔行情应答,包括股票、指数和期权
47 ///@param ticker 详细的合约订阅情况
48 ///@param error_info 订阅合约发生错误时的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
49 ///@param is_last 是否此次订阅的最后一个应答,当为最后一个的时候为true,如果为false,表示还有其他后续消息响应
50 ///@remark 每条订阅的合约均对应一条订阅应答,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
51 void OnSubTickByTick(XTPST *ticker, XTPRI *error_info, bool is_last) override;
52
53 /// 逐笔行情通知,包括股票、指数和期权
54 ///@param tbt_data
55 /// 逐笔行情数据,包括逐笔委托和逐笔成交,此为共用结构体,需要根据type来区分是逐笔委托还是逐笔成交,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
56 void OnTickByTick(XTPTBT *tbt_data) override;
57
58 /// 订阅全市场的股票逐笔行情应答
59 ///@param exchange_id
60 /// 表示当前全订阅的市场,如果为XTP_EXCHANGE_UNKNOWN,表示沪深全市场,XTP_EXCHANGE_SH表示为上海全市场,XTP_EXCHANGE_SZ表示为深圳全市场
61 ///@param error_info
62 /// 取消订阅合约时发生错误时返回的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
63 ///@remark 需要快速返回
64 void OnSubscribeAllTickByTick(XTP_EXCHANGE_TYPE exchange_id, XTPRI *error_info) override;
65
66 /// 查询可交易合约的应答
67 ///@param ticker_info 可交易合约信息
68 ///@param error_info
69 /// 查询可交易合约时发生错误时返回的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
70 ///@param is_last
71 /// 是否此次查询可交易合约的最后一个应答,当为最后一个的时候为true,如果为false,表示还有其他后续消息响应
72 void OnQueryAllTickers(XTPQSI *ticker_info, XTPRI *error_info, bool is_last) override;
73
74 /// 查询合约完整静态信息的应答
75 ///@param ticker_info 合约完整静态信息
76 ///@param error_info
77 /// 查询合约完整静态信息时发生错误时返回的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
78 ///@param is_last
79 /// 是否此次查询合约完整静态信息的最后一个应答,当为最后一个的时候为true,如果为false,表示还有其他后续消息响应
80 void OnQueryAllTickersFullInfo(XTPQFI *ticker_info, XTPRI *error_info, bool is_last) override;
81
82protected:
83 void on_start() override;
84
85 void pre_start() override;
86
87private:
88 XTP::API::QuoteApi *api_{};
89 uint32_t entrust_band_uid_{};
90 uint32_t transaction_band_uid_{};
91
92 inline static thread_local yijinjing::journal::writer_ptr entrust_band_writer_{};
93 inline static thread_local yijinjing::journal::writer_ptr transaction_band_writer_{};
94
95 bool subscribe(const std::vector<std::string> &instruments, const std::string &exchange_id);
96};
97} // namespace kungfu::wingchun::xtp
98
99#endif // KUNGFU_XTP_EXT_MARKET_DATA_H
marketdata_xtp.cpp文件
xtp的行情源函数实现
1#include "marketdata_xtp.h"
2#include "serialize_xtp.h"
3#include "type_convert.h"
4
5using namespace kungfu::yijinjing;
6using namespace kungfu::yijinjing::data;
7
8namespace kungfu::wingchun::xtp {
9struct MDConfiguration {
10 int client_id;
11 std::string account_id;
12 std::string password;
13 std::string md_ip;
14 int md_port;
15 std::string protocol;
16 int buffer_size;
17 bool query_instruments;
18};
19
20void from_json(const nlohmann::json &j, MDConfiguration &c) {
21 j.at("client_id").get_to(c.client_id);
22 j.at("account_id").get_to(c.account_id);
23 j.at("password").get_to(c.password);
24 j.at("md_ip").get_to(c.md_ip);
25 j.at("md_port").get_to(c.md_port);
26 c.protocol = j.value("protocol", "tcp");
27 if (c.protocol != "udp") {
28 c.protocol = "tcp";
29 }
30 c.buffer_size = j.value("buffer_size", 64);
31 c.query_instruments = j.value<bool>("query_instruments", false);
32}
33
34MarketDataXTP::MarketDataXTP(broker::BrokerVendor &vendor) : MarketData(vendor), api_(nullptr) {
35 KUNGFU_SETUP_LOG();
36 SPDLOG_DEBUG("arguments: {}", get_vendor().get_arguments());
37}
38
39MarketDataXTP::~MarketDataXTP() {
40 if (api_ != nullptr) {
41 api_->Release();
42 }
43}
44
45void MarketDataXTP::pre_start() {
46 entrust_band_uid_ = request_band("market-data-band-entrust", 256);
47 transaction_band_uid_ = request_band("market-data-band-transaction", 256);
48}
49
50void MarketDataXTP::on_start() {
51 MDConfiguration config = nlohmann::json::parse(get_config());
52 if (config.client_id < 1 or config.client_id > 99) {
53 SPDLOG_ERROR("client_id must between 1 and 99");
54 }
55 auto md_ip = config.md_ip.c_str();
56 auto account_id = config.account_id.c_str();
57 auto password = config.password.c_str();
58 auto protocol_type = get_xtp_protocol_type(config.protocol);
59 std::string runtime_folder = get_runtime_folder();
60 SPDLOG_INFO("Connecting XTP MD for {} at {}://{}:{}", account_id, config.protocol, md_ip, config.md_port);
61 api_ =
62 XTP::API::QuoteApi::CreateQuoteApi(config.client_id, runtime_folder.c_str(), XTP_LOG_LEVEL::XTP_LOG_LEVEL_INFO);
63 if (config.protocol == "udp") {
64 api_->SetUDPBufferSize(config.buffer_size);
65 }
66 api_->RegisterSpi(this);
67 if (api_->Login(md_ip, config.md_port, account_id, password, protocol_type) == 0) {
68 update_broker_state(BrokerState::LoggedIn);
69 update_broker_state(BrokerState::Ready);
70 SPDLOG_INFO("login success! (account_id) {}", config.account_id);
71 if (config.query_instruments and not check_if_stored_instruments(time::strfnow("%Y%m%d"))) {
72 api_->QueryAllTickers(XTP_EXCHANGE_SH);
73 api_->QueryAllTickers(XTP_EXCHANGE_SZ);
74 api_->QueryAllTickersFullInfo(XTP_EXCHANGE_SH);
75 api_->QueryAllTickersFullInfo(XTP_EXCHANGE_SZ);
76 }
77 } else {
78 update_broker_state(BrokerState::LoginFailed);
79 SPDLOG_ERROR("failed to login, [{}] {}", api_->GetApiLastError()->error_id, api_->GetApiLastError()->error_msg);
80 }
81}
82
83bool MarketDataXTP::subscribe(const std::vector<InstrumentKey> &instrument_keys) {
84 bool result = true;
85 std::vector<std::string> sse_tickers;
86 std::vector<std::string> sze_tickers;
87 for (const auto &inst : instrument_keys) {
88 std::string ticker = inst.instrument_id;
89 if (strcmp(inst.exchange_id, EXCHANGE_SSE) == 0) {
90 sse_tickers.push_back(ticker);
91 } else if (strcmp(inst.exchange_id, EXCHANGE_SZE) == 0) {
92 sze_tickers.push_back(ticker);
93 }
94 }
95 if (!sse_tickers.empty()) {
96 result &= subscribe(sse_tickers, EXCHANGE_SSE);
97 }
98 if (!sze_tickers.empty()) {
99 result &= subscribe(sze_tickers, EXCHANGE_SZE);
100 }
101 return result;
102}
103
104bool MarketDataXTP::subscribe(const std::vector<std::string> &instruments, const std::string &exchange_id) {
105 int size = instruments.size();
106 std::vector<char *> insts;
107 insts.reserve(size);
108 std::transform(instruments.begin(), instruments.end(), std::back_inserter(insts),
109 [](auto &s) { return const_cast<char *>(s.c_str()); });
110 XTP_EXCHANGE_TYPE exchange;
111 to_xtp_exchange(exchange, exchange_id.c_str());
112 int level1_result = api_->SubscribeMarketData(insts.data(), size, exchange);
113 int level2_result = api_->SubscribeTickByTick(insts.data(), size, exchange);
114 SPDLOG_INFO("subscribe {} from {}, l1 rtn code {}, l2 rtn code {}", size, exchange_id, level1_result,
115 level2_result);
116 return level1_result == 0 and level2_result == 0;
117}
118
119bool MarketDataXTP::subscribe_all() {
120 auto result = api_->SubscribeAllMarketData() && api_->SubscribeAllTickByTick();
121 SPDLOG_INFO("subscribe all, rtn code {}", result);
122 return result;
123}
124
125bool MarketDataXTP::subscribe_custom(const longfist::types::CustomSubscribe &custom_sub) {
126 SPDLOG_INFO("custom_sub, market_type {} instrument_type {} data_type {} update_time {}",
127 int(custom_sub.market_type), long(custom_sub.instrument_type), long(custom_sub.data_type),
128 long(custom_sub.update_time));
129 subscribe_all();
130 return true;
131}
132
133void MarketDataXTP::OnDisconnected(int reason) {
134 SPDLOG_ERROR("disconnected with reason {}", reason);
135 update_broker_state(BrokerState::DisConnected);
136}
137
138void MarketDataXTP::OnSubMarketData(XTPST *ticker, XTPRI *error_info, bool is_last) {
139 if (nullptr != ticker) {
140 SPDLOG_DEBUG("XTPST: {}, is_last: {}", to_string(*ticker), is_last);
141 }
142 if (error_info != nullptr && error_info->error_id != 0) {
143 SPDLOG_ERROR("failed to subscribe level 1, [{}] {}", error_info->error_id, error_info->error_msg);
144 }
145}
146
147void MarketDataXTP::OnSubTickByTick(XTPST *ticker, XTPRI *error_info, bool is_last) {
148 if (nullptr != ticker) {
149 SPDLOG_DEBUG("XTPST: {}, is_last: {}", to_string(*ticker), is_last);
150 }
151 if (error_info != nullptr && error_info->error_id != 0) {
152 SPDLOG_ERROR("failed to subscribe level 2, [{}] {}", error_info->error_id, error_info->error_msg);
153 }
154}
155
156void MarketDataXTP::OnSubscribeAllTickByTick(XTP_EXCHANGE_TYPE exchange_id, XTPRI *error_info) {
157 if (error_info != nullptr && error_info->error_id != 0) {
158 SPDLOG_ERROR("failed to subscribe level 2 all, [{}] {}", error_info->error_id, error_info->error_msg);
159 }
160}
161
162void MarketDataXTP::OnQueryAllTickers(XTPQSI *ticker_info, XTPRI *error_info, bool is_last) {
163 if (nullptr != error_info && error_info->error_id != 0) {
164 SPDLOG_ERROR("error_id : {} , error_msg : {}", error_info->error_id, error_info->error_msg);
165 return;
166 }
167
168 if (nullptr == ticker_info) {
169 SPDLOG_ERROR("ticker_info is nullptr");
170 return;
171 }
172
173 Instrument &instrument = get_public_writer()->open_data<Instrument>(0);
174 from_xtp(ticker_info, instrument);
175 get_public_writer()->close_data();
176}
177
178void MarketDataXTP::OnDepthMarketData(XTPMD *market_data, int64_t *bid1_qty, int32_t bid1_count, int32_t max_bid1_count,
179 int64_t *ask1_qty, int32_t ask1_count, int32_t max_ask1_count) {
180 if (nullptr == market_data) {
181 SPDLOG_ERROR("XTPMD is nullptr");
182 }
183
184 Quote "e = get_public_writer()->open_data<Quote>(0);
185 from_xtp(*market_data, quote);
186 get_public_writer()->close_data();
187}
188
189void MarketDataXTP::OnTickByTick(XTPTBT *tbt_data) {
190 if (tbt_data->type == XTP_TBT_ENTRUST) {
191 if (tbt_data->entrust.ord_type == 'D') {
192 if (not transaction_band_writer_) {
193 while (not has_band_writer(transaction_band_uid_)) {
194 std::this_thread::sleep_for(std::chrono::milliseconds(1));
195 }
196 transaction_band_writer_ = get_band_writer(transaction_band_uid_);
197 }
198 Transaction &transaction = transaction_band_writer_->open_data<Transaction>(0);
199 from_xtp(*tbt_data, transaction);
200 transaction_band_writer_->close_data();
201 } else {
202 if (not entrust_band_writer_) {
203 while (not has_band_writer(entrust_band_uid_)) {
204 std::this_thread::sleep_for(std::chrono::milliseconds(1));
205 }
206 entrust_band_writer_ = get_band_writer(entrust_band_uid_);
207 }
208 Entrust &entrust = entrust_band_writer_->open_data<Entrust>(0);
209 from_xtp(*tbt_data, entrust);
210 entrust_band_writer_->close_data();
211 }
212 } else if (tbt_data->type == XTP_TBT_TRADE) {
213 if (not transaction_band_writer_) {
214 while (not has_band_writer(transaction_band_uid_)) {
215 std::this_thread::sleep_for(std::chrono::milliseconds(1));
216 }
217 transaction_band_writer_ = get_band_writer(transaction_band_uid_);
218 }
219 Transaction &transaction = transaction_band_writer_->open_data<Transaction>(0);
220 from_xtp(*tbt_data, transaction);
221 transaction_band_writer_->close_data();
222 }
223}
224
225void MarketDataXTP::OnQueryAllTickersFullInfo(XTPQFI *ticker_info, XTPRI *error_info, bool is_last) {
226 if (nullptr != error_info && error_info->error_id != 0) {
227 SPDLOG_INFO("error_id : {} , error_msg : {}", error_info->error_id, error_info->error_msg);
228 return;
229 }
230
231 if (nullptr == ticker_info) {
232 SPDLOG_ERROR("ticker_info is nullptr");
233 return;
234 }
235
236 Instrument &instrument = get_public_writer()->open_data<Instrument>(0);
237 instrument.instrument_id = ticker_info->ticker;
238 if (ticker_info->exchange_id == 1) {
239 instrument.exchange_id = EXCHANGE_SSE;
240 } else if (ticker_info->exchange_id == 2) {
241 instrument.exchange_id = EXCHANGE_SZE;
242 }
243
244 memcpy(instrument.product_id, ticker_info->ticker_name, strlen(ticker_info->ticker_name));
245 instrument.instrument_type = get_instrument_type(instrument.exchange_id, instrument.instrument_id);
246 SPDLOG_TRACE("instrument {}", instrument.to_string());
247 get_public_writer()->close_data();
248
249 if (is_last) {
250 record_stored_instruments_trading_day(time::strfnow("%Y%m%d"));
251 }
252}
253} // namespace kungfu::wingchun::xtp
exports.cpp文件
xtp柜台和行情源绑定到python模块
1#include "marketdata_xtp.h"
2#include "trader_xtp.h"
3
4#include <kungfu/wingchun/extension.h>
5
6KUNGFU_EXTENSION() {
7 KUNGFU_DEFINE_MD(kungfu::wingchun::xtp::MarketDataXTP);
8 KUNGFU_DEFINE_TD(kungfu::wingchun::xtp::TraderXTP);
9}
编译指令
1# 在packge.json所在的目录执行
2yarn build
编译后目录结构:
xtp/ # xtp柜台名称
├── src/
│ └── cpp
│ ├── buffer_data.h
│ ├── exports.cpp
│ ├── marketdata_xtp.cpp
│ ├── marketdata_xtp.h
│ ├── serialize_xtp.h
│ ├── trader_xtp.cpp
│ ├── trader_xtp.h
│ └── type_convert.h
└── package.json # 编译配置信息
--------以下为编译后自动生成内容------------------------------------
├── __kungfulibs__ # 根据package.json的kungfuDependencies下载
│ └── xtp # 柜台名
│ └── v2.2.37.4 # 柜台API版本
│ ├── doc # 柜台文档
│ ├── include # 柜台头文件
│ └── lib # 柜台库文件
├── CMakeLists.txt # 根据package.json自动生成
├── build # 编译中间目录
└── dist # 编译结果
└── xtp
备注
如果是已经添加到Kungfu柜台仓库的柜台API版本, 在执行yarn build之后就可以自动从仓库上下载;
如果柜台API版本不在Kungfu柜台仓库列表中, 需要手动创建以下目录
手动添加自定义的库目录:
├── __kungfulibs__ # kugnfu寻找C++库的目录,
│ └── {对接的柜台名字} # 柜台名, 需要和package.json中的kungfuDependencies配置的key相同
│ └── {柜台的版本号} # 柜台API版本, 需要和package.json中的kungfuDependencies配置的value相同
│ ├── doc # 柜台文档
│ ├── include # 柜台头文件
│ └── lib # 柜台库文件
启动插件
添加交易柜台
将dist目录下xtp整个目录拷贝到 {kungfu安装目录}/resources/app/kungfu-extensions 目录下, 启动Kungfu图形化界面,
选择主面板中的”交易账户”界面点击添加, 选择xtp模块.

添加行情源
选择主面板中的”行情源”界面点击添加, 选择xtp模块.

策略范例
Python策略范例
源码目录结构:
strategy-python-101/ # 名字随意
├── src/
│ └── python
| └── KungfuStrategy101Python # 必须与package.json中kungfuConfig的key相同
| └── __init__.py # python策略代码,文件名不可更改
└── package.json # 编译配置信息
编译后文件目录结构:
strategy-python-101/
├── src/
│ └── python
| └── KungfuStrategy101Python
| └── __init__.py
├── package.json
├── __pypackages__/ # Python模块库, 自动生成
├── dist/ # 编译打包出来的二进制文件
| └── KungfuStrategy101Python
| └── KungfuStrategy101Python.cp39-win_amd64.pyd # 编译之后的二进制文件
├── pdm.lock # build后下载依赖库自动生成的文件
└── pyproject.toml # build后下载依赖库自动生成的文件
package.json:
{
"name": "@kungfu-trader/examples-strategy-python",
"author": {
"name": "Kungfu Trader",
"email": "info@kungfu.link"
},
"description": "KungFu Strategy 101 - Python Demo",
"license": "Apache-2.0",
"kungfuBuild": {
"python": {
"dependencies": {}
}
},
"kungfuConfig": {
"key": "KungfuStrategy101Python" # key对应的是 策略文件中策略文件(.py文件)所在文件夹的名字 , 这个名字不能有 _ , - . 比如命名不可以是 : kungfu-demo , kungfu_demo
}
}
__init__.py 策略范例 :
1# __init__.py
2
3import random
4from kungfu.wingchun.constants import *
5import kungfu
6
7lf = kungfu.__binding__.longfist
8wc = kungfu.__binding__.wingchun
9yjj = kungfu.__binding__.yijinjing
10
11source = "sim" # 目标交易账户的柜台名称
12account = "fill" # 目标交易账户的账户号, 需添加 sim 柜台的账户号为 fill 的账户
13md_source = "sim" # 目标行情源的柜台名称, 需添加 sim 行情源
14
15
16def pre_start(context):
17 context.log.info("pre start")
18 context.add_account(source, account) # 添加交易账户
19 context.subscribe(md_source, ["600000", "600004", "600009"], Exchange.SSE) # 订阅行情
20 context.subscribe(md_source, ["300033", "300059"], Exchange.SZE) # 订阅行情
21 context.subscribe(md_source, ["rb2401"], Exchange.SHFE) # 订阅行情
22 context.subscribe(md_source, ["sc2401"], Exchange.INE) # 订阅行情
23 # context.subscribe_operator("bar", "123") # 需从算子入口添加bar插件, 并定义bar的id为123
24 context.throttle_insert_order = {}
25
26
27def post_start(context):
28 account_uid = context.get_account_uid(source, account)
29 context.log.info(f"account {source} {account}, account_uid: {account_uid}")
30
31
32def on_quote(context, quote, location, dest):
33 # insert order interval 10s
34 if context.now() - context.throttle_insert_order.get(quote.instrument_id, 0) < 10000000000:
35 return
36 context.throttle_insert_order[quote.instrument_id] = context.now()
37
38 side = random.choice([Side.Buy, Side.Sell])
39 offset = random.choice([Offset.Open, Offset.Close])
40 side = random.choice([Side.Buy, Side.Sell])
41 price = quote.ask_price[0] if side == Side.Buy else quote.bid_price[0]
42 price_type = random.choice([PriceType.Any, PriceType.Limit])
43 volume = 3 if quote.instrument_type == InstrumentType.Future else 300
44 order_id = context.insert_order(
45 quote.instrument_id,
46 quote.exchange_id,
47 source,
48 account,
49 price,
50 volume,
51 price_type,
52 side,
53 offset,
54 )
55 context.log.info(f"insert order: {order_id}")
56
57
58# 监听算子广播信息
59def on_synthetic_data(context, synthetic_dataa, location, dest):
60 context.log.info("on_synthetic_data: {}".format(synthetic_dataa))
61
62
63def on_order(context, order, location, dest):
64 context.log.info(f"on_order: {order}, from {location} to {dest}")
65
66 if not wc.utils.is_final_status(order.status):
67 context.cancel_order(order.order_id)
68
69
70def on_trade(context, trade, location, dest):
71 context.log.info(f"on_trade: {trade}, from {location} to {dest}")
通过主面板的 策略进程->添加->策略路径 选择 KungfuStrategy101Python.cp39-win_amd64.pyd, 点击启动就可以运行Python编译后的策略代码
CPP策略范例
源码目录结构:
strategy-cpp-101/
├── src/
│ └── cpp
| └── strategy.cpp # cpp策略代码
└── package.json # 编译配置信息
编译后文件目录结构:
strategy--101/
├── src/
│ └── cpp
| └── strategy.cpp # cpp策略代码
├── package.json
├── dist/ # 编译打包出来的二进制文件
| └── KungfuStrategy101Cpp
| └── KungfuStrategy101Cpp.cp39-win_amd64.pyd # 编译之后的二进制文件
└── build # build 编译生成中间文件
package.json:
{
"name": "@kungfu-trader/examples-strategy-cpp",
"author": "kungfu-trader",
"description": "KungFu Strategy 101 - C++ Demo",
"license": "Apache-2.0",
"kungfuConfig": {
"key": "KungfuStrategy101Cpp" # 编译之后的二进制文件所在文件夹名
},
"kungfuBuild": { # 打包模块相关
"cpp": {
"target": "bind/python"
}
}
}
1// strategy.cpp
2#include <kungfu/wingchun/extension.h>
3#include <kungfu/wingchun/strategy/context.h>
4#include <kungfu/wingchun/strategy/strategy.h>
5
6using namespace kungfu::longfist::enums;
7using namespace kungfu::longfist::types;
8using namespace kungfu::wingchun::strategy;
9using namespace kungfu::yijinjing::data;
10
11KUNGFU_MAIN_STRATEGY(KungfuStrategy101) {
12public:
13KungfuStrategy101() = default;
14~KungfuStrategy101() = default;
15
16void pre_start(Context_ptr & context) override {
17 SPDLOG_INFO("preparing strategy");
18 SPDLOG_INFO("arguments: {}", context->get_arguments());
19
20 context->add_account("sim", "fill");
21 context->subscribe("sim", {"600000"}, {"SSE"});
22}
23
24void post_start(Context_ptr & context) override { SPDLOG_INFO("strategy started"); }
25
26void on_quote(Context_ptr & context, const Quote "e, const location_ptr &location, uint32_t dest) override {
27 SPDLOG_INFO("Quote: {} location: {}", quote.to_string(), location->to_string());
28 context->insert_order(quote.instrument_id, quote.exchange_id, "sim", "fill", quote.last_price, 200,
29 PriceType::Limit, Side::Buy, Offset::Open);
30}
31
32void on_order(Context_ptr & context, const Order &order, const location_ptr &location, uint32_t dest) override {
33 SPDLOG_INFO("Order: {}", order.to_string());
34}
35
36void on_trade(Context_ptr & context, const Trade &trade, const location_ptr &location, uint32_t dest) override {
37 SPDLOG_INFO("Trade: {}", trade.to_string());
38}
39
40void on_tree(Context_ptr & context, const Tree &tree, const location_ptr &location, uint32_t dest) override {
41 SPDLOG_INFO("on tree: {}", tree.to_string());
42}
43
44void on_synthetic_data(Context_ptr & context, const SyntheticData &synthetic_data, const location_ptr &location,
45 uint32_t dest) override {
46 SPDLOG_INFO("on_synthetic_data: {} ", synthetic_data.to_string());
47}
48
49void on_broker_state_change(Context_ptr & context, const BrokerStateUpdate &broker_state_update,
50 const location_ptr &location) override {
51 SPDLOG_INFO("on broker state changed: {}", broker_state_update.to_string());
52}
53
54void on_operator_state_change(Context_ptr & context, const OperatorStateUpdate &operator_state_update,
55 const location_ptr &location) override {
56 SPDLOG_INFO("on operator state changed: {}", operator_state_update.to_string());
57}
58};
通过主面板的 策略进程->添加->策略路径 选择 KungfuStrategy101Cpp.cp39-win_amd64.pyd, 点击启动就可以运行Python编译后的策略代码
CPP策略可执行程序范例
源码目录结构:
strategy-cpp-101-exe/
├── src/
│ └── cpp
| └── strategy.cpp # cpp策略代码
└── package.json # 编译配置信息
编译后文件目录结构:
strategy-cpp-101-exe/
├── src/
│ └── cpp
| └── strategy.cpp # cpp策略代码
├── package.json
├── dist/ # 编译打包出来的二进制文件
| └── KungfuStrategy101CppExe
| └── KungfuStrategy101CppExe.exe # 可执行文件
└── build # build 编译生成中间文件
package.json:
{
"name": "@kungfu-trader/examples-strategy-cpp",
"author": "kungfu-trader",
"description": "KungFu Strategy 101 - C++ Demo",
"license": "Apache-2.0",
"kungfuConfig": {
"key": "KungfuStrategy101CppExe"
},
"kungfuBuild": {
"cpp": {
"target": "exe"
}
}
}
1// strategy.cpp
2#include <kungfu/wingchun/strategy/context.h>
3#include <kungfu/wingchun/strategy/runner.h>
4#include <kungfu/wingchun/strategy/strategy.h>
5
6using namespace kungfu::longfist::enums;
7using namespace kungfu::longfist::types;
8using namespace kungfu::wingchun::strategy;
9using namespace kungfu::yijinjing::data;
10
11class KungfuStrategy101 : public Strategy {
12public:
13KungfuStrategy101() = default;
14~KungfuStrategy101() = default;
15
16void pre_start(Context_ptr &context) override {
17 SPDLOG_INFO("preparing strategy");
18 SPDLOG_INFO("arguments: {}", context->get_arguments());
19
20 context->add_account("sim", "fill");
21 context->subscribe("sim", {"600000"}, {"SSE"});
22}
23
24void post_start(Context_ptr &context) override { SPDLOG_INFO("strategy started"); }
25
26void on_quote(Context_ptr &context, const Quote "e, const location_ptr &location, uint32_t dest) override {
27 SPDLOG_INFO("Quote: {} location: {}", quote.to_string(), location->to_string());
28 context->insert_order(quote.instrument_id, quote.exchange_id, "sim", "fill", quote.last_price, 200,
29 PriceType::Limit, Side::Buy, Offset::Open);
30}
31
32void on_order(Context_ptr &context, const Order &order, const location_ptr &location, uint32_t dest) override {
33 SPDLOG_INFO("Order: {}", order.to_string());
34}
35
36void on_trade(Context_ptr &context, const Trade &trade, const location_ptr &location, uint32_t dest) override {
37 SPDLOG_INFO("Trade: {}", trade.to_string());
38}
39
40void on_tree(Context_ptr &context, const Tree &tree, const location_ptr &location, uint32_t dest) override {
41 SPDLOG_INFO("on tree: {}", tree.to_string());
42}
43
44void on_synthetic_data(Context_ptr &context, const SyntheticData &synthetic_data, const location_ptr &location,
45 uint32_t dest) override {
46 SPDLOG_INFO("on_synthetic_data: {} ", synthetic_data.to_string());
47}
48
49void on_broker_state_change(Context_ptr &context, const BrokerStateUpdate &broker_state_update,
50 const location_ptr &location) override {
51 SPDLOG_INFO("on broker state changed: {}", broker_state_update.to_string());
52}
53
54void on_operator_state_change(Context_ptr &context, const OperatorStateUpdate &operator_state_update,
55 const location_ptr &location) override {
56 SPDLOG_INFO("on operator state changed: {}", operator_state_update.to_string());
57}
58};
59
60int main(int argc, char **argv) {
61 SPDLOG_INFO("runner1 add strategy1");
62 Runner runner(std::make_shared<locator>(), "CppStrategy", "demo01exe", mode::LIVE, false);
63 SPDLOG_INFO("runner");
64 runner.add_strategy(std::make_shared<KungfuStrategy101>());
65 runner.run();
66 SPDLOG_INFO("Over");
67 return 0;
68}
直接运行 KungfuStrategy101CppExe.exe 程序就可以运行以上策略
小技巧
需要将 {Kungfu安装目录}/resource/kfc/ 目录下的Kungfu.dll放在KungfuStrategy101CppExe.exe同一个目录下, 或者配置系统变量使得程序运行时可以找到动态库Kungfu.dll
尽量使用命令行运行, 鼠标双击运行后在退出时会自动关闭命令行界面, 必须要找到Kungfu运行home目录下找到对应log文件才能查看运行日志