柜台对接
前置注意事项
本节主要介绍以下内容,
Kungfu共享内存结构, 理解该结构有助于处理柜台的参数 kungfu::event_ptr&
kungfu::event_ptr 的内容和使用方式
柜台回调函数的多线程处理, 通过journal把数据转到主线程处理, 避免加锁, 同时支持回放
kungfu共享内存结构
kungfu框架中进程间通信采用共享内存队列的形式, 一个进程将数据写入共享内存文件, 其他多个进程可以同时读取.
一条信道对应一个逻辑上的journal文件, 表示逻辑上一个无限大的队列.
具体实现时将一个journal切分为多个固定大小的page, 只要磁盘足够大就可以无限的增加page的个数, 每一个page文件的命名规则为 “dest_id的十六进制进程.page数.journal”.
存放在写入进程的目录下, 实盘模式下写入进程的journal所属目录命名规则为 “$KF_HOME/runtime/journal/{category}/{group}/{name}/{mode}”.
范例
1$KF_HOME/runtime/journal/td/CTP/089270/live, 表示实盘模式的CTP柜台对应账户为089270的共享内存文件目录
每一个page存放具体的数据, 每一个数据视为一个frame, frame由frame_header和data组成.
journal的结构如下图所示,

kungfu::event_ptr
在kungfu系统中, 所有的”typename_ptr”都实际表示std::shared_ptr<typename>, 即kungfu::event_ptr表示std::shared_ptr<kungfu::event>.
当A进程往共享内存里写入一个数据时, 会生成一个frame, 里面包含了frame_header和具体的数据内容, 当B进程读到该数据时, 也会将frame_header和后面的数据封装成一个frame, kungfu::event_ptr就是指向这个frame的指针, B进程会根据frame_header里面的数据类型msg_type触发不同的函数.
例如当策略调用下单时, 会往共享内存里写入一个OrderInput数据, TD读取到这个数据时, 会把frame_header和OrderInput封装在一个frame里, 然后调用虚函数insert_order函数, 并且把指向这个frame的指针kungfu::event_ptr作为参数传入, 在override的insert_order函数里通过调用event的对应函数, 可以获取到这个frame里面的frame_header和具体数据OrderInput的信息.
范例
1//ctp的insert_order实现
2bool TraderCTP::insert_order(const event_ptr &event) {
3 const OrderInput &input = event->data<OrderInput>();
4 SPDLOG_DEBUG("OrderInput: {}", input.to_string());
5
6 int request_id = get_request_id();
7 int error_id = 0;
8 uint64_t orderRef_key = 0;
9
10 CThostFtdcInputOrderField ctp_input{};
11 to_ctp(ctp_input, input);
12 strcpy(ctp_input.BrokerID, config_.broker_id.c_str());
13 strcpy(ctp_input.InvestorID, config_.account_id.c_str());
14 strcpy(ctp_input.OrderRef, std::to_string(++order_ref_).c_str());
15 SPDLOG_DEBUG("CThostFtdcInputOrderField: {}", to_string(ctp_input));
16 error_id = api_->ReqOrderInsert(&ctp_input, request_id);
17 orderRef_key = get_orderRef_key(front_id_, session_id_, ctp_input.OrderRef);
18
19 auto nano = time::now_in_nano();
20 auto writer = get_writer(event->source());
21 Order &order = writer->open_data<Order>(event->gen_time());
22 order_from_input(input, order);
23 order.insert_time = nano;
24 order.update_time = nano;
25 order.time_condition = from_ctp_time_condition(ctp_input.TimeCondition);
26
27 if (error_id != 0) {
28 order.error_id = error_id;
29 order.status = OrderStatus::Error;
30 }
31
32 SPDLOG_DEBUG("Order: {}", order.to_string());
33 writer->close_data();
34 map_request_id_to_kf_order_id_.insert_or_assign(request_id, uint64_t(input.order_id));
35 map_OrderRefKey_to_kf_order_id_.insert_or_assign(orderRef_key, uint64_t(input.order_id));
36 map_kf_order_id_to_OrderRef_.insert_or_assign(input.order_id, ctp_input.OrderRef);
37 return error_id == 0;
38}
柜台回调多线程处理
一般情况下, 我们主动调用柜台API接口发送数据, 和柜台SPI回调接口是在两个不同的线程, 有的柜台API接口响应函数和数据推送函数是不同的线程, 此时是有三个线程.
对于TD我们提交委托和收到委托响应, 以及收到成交推送时, 都需要访问map <柜台委托号, Kungfu委托号> 来确定成交和委托的映射关系, 一般情况下都会选择使用加锁的方式来保证线程安全.
例如, 针对于异步下单的场景, 我们需要先构建 <下单跟踪信息, Kungfu委托号> map映射关系, 在委托报单响应回调里面访问这个map再构建 <API委托号, Kungfu委托号> 映射关系, 当收到成交推送的时候才能找到这笔成交属于Kungfu的哪一笔委托, 此时如果不对相关的容器进行加锁, 最容易遇到的情况就是一个线程插入数据导致stl扩容, 原来的内存地址失效, 此时另一个线程正在访问原来的内存地址, 此时会导致整个进程崩溃.
虽然通过加锁可以解决TD这一层的线程安全问题, 但是我们整个Kungfu框架的底层设计是针对于多进程单线程设计的, 底层的容器的访问绝大部分都没有加锁, API子线程和Kungfu主线程同时访问同一个容器时, 仍然存在线程安全问题 例如, 当柜台API成交推送在子线程推送成交数据时, 在子线程里调用get_writer(策略A)去把成交信息写给策略A, 而此时有策略B启动申请和TD通信, 主线程刚好需要添加TD写给策略B的writer, 此时有可能会导致get_writer(策略A)访问的容器触发扩容内存失效, 导致内存崩溃, 虽然这种概率非常小, 但是也存在风险.
为了规避多线程安全问题, 以及防止频繁的加锁解锁带来的性能损耗, 我们统一将TD子线程推送的数据通过共享内存传输给主线程处理. 其实现方式为, 定义一个 inline static thread_local journal::writer_ptr thread_writer_; 给每一个子线程, 调用 get_thread_writer() 会获取属于当前线程的writer, Kungfu底层会做好主线程读取的处理. 当子线程回调触发时, 调用该函数, 直接将数据原原本本的写入到共享内存, 主线程读取到后再做后续的业务处理, 这样做的好处有两个,
所有的容器访问都在主线程, 不需要加锁
柜台APi的原始数据全部落地保存, 可以随时使用回放功能查看柜台原始数据
针对于共享内存中的每一个数据, Kugnfu会针对于frame_header里的msg_type来执行不同处理函数, 使用thread_writer将数据从子线程转到主线程处理时, 需要自己针对于不同的柜台API数据定义msg_type, 在TD进程中, 所有不属于Kungfu系统定义的msg_type都会执行 virtual bool on_custom_event(const event_ptr &event); 回调. 在override中再分配针对于自定义msg_type的处理.
柜台范例, XTP的Order和Trade的回调处理
1// 建议使用大于一百万的整型数作为自定义的msg_type
2static constexpr int32_t kXTPOrderInfoType = 12340001;
3static constexpr int32_t kXTPTradeReportType = 12340002;
4static constexpr int32_t kQueryXTPOrderInfoType = 12340003;
5static constexpr int32_t kQueryXTPTradeReportType = 12340004;
6static constexpr int32_t kCancelOrderErrorType = 12340005;
7static constexpr int32_t kQueryAssetType = 12340011;
8static constexpr int32_t kQueryPositionType = 12340012;
9
10// 自定义msg_type数据回调, 根据对应msg_type执行对应的处理
11bool TraderXTP::on_custom_event(const event_ptr &event) {
12 SPDLOG_DEBUG("msg_type: {}", event->msg_type());
13 switch (event->msg_type()) {
14 case kXTPOrderInfoType:
15 return custom_OnOrderEvent(event);
16 case kXTPTradeReportType:
17 return custom_OnTradeEvent(event);
18 case kQueryXTPOrderInfoType:
19 return custom_OnQueryOrder(event);
20 case kQueryXTPTradeReportType:
21 return custom_OnQueryTrade(event);
22 case kCancelOrderErrorType:
23 return custom_OnCancelOrderError(event);
24 case kQueryAssetType:
25 return custom_OnQueryAsset(event);
26 case kQueryPositionType:
27 return custom_OnQueryPosition(event);
28 default:
29 SPDLOG_ERROR("unrecognized msg_type: {}", event->msg_type());
30 return false;
31 }
32}
33
34// 对于回调参数为多个的回调接口, 需要定义一个struct一起存放
35// 成交推送和查询历史成交的数据结构相似, 共用一个struct, 多出来的字段查询历史成交用的
36struct BufferXTPTradeReport {
37 XTPQueryTradeRsp trade_info;
38 XTPRI error_info;
39 int request_id;
40 bool is_last;
41 uint64_t session_id;
42};
43
44// 委托回调和查询历史委托的数据结构相似, 共用一个struct, 多出来的字段查询历史委托用的
45struct BufferXTPOrderInfo {
46 XTPOrderInfo order_info;
47 XTPRI error_info;
48 int request_id;
49 bool is_last;
50 uint64_t session_id;
51};
52
53// xtp的子线程Order回调函数, 直接将数据写入共享内存, 不做任何业务处理
54void TraderXTP::OnOrderEvent(XTPOrderInfo *order_info, XTPRI *error_info, uint64_t session_id) {
55 if (nullptr == order_info) {
56 SPDLOG_ERROR("XTPOrderInfo is nullptr");
57 return;
58 }
59 SPDLOG_DEBUG("XTPOrderInfo: {}", to_string(*order_info));
60 auto &bf_order_info = get_thread_writer()->open_custom_data<BufferXTPOrderInfo>(kXTPOrderInfoType, now());
61 memcpy(&bf_order_info.order_info, order_info, sizeof(XTPOrderInfo));
62 bf_order_info.session_id = session_id;
63 if (error_info != nullptr) {
64 memcpy(&bf_order_info.error_info, error_info, sizeof(XTPRI));
65 } else {
66 memset(&bf_order_info.error_info, 0, sizeof(XTPRI));
67 }
68 SPDLOG_DEBUG("BufferXTPOrderInfo: {}", to_string(bf_order_info));
69 get_thread_writer()->close_data();
70}
71
72// 主线程处理, 将自定义的struct读出, 对应的数据分解回原来的样子, 传给对应的Order处理函数
73bool TraderXTP::custom_OnOrderEvent(const event_ptr &event) {
74 const auto *bf_order_info = reinterpret_cast<const BufferXTPOrderInfo *>(event->data_address());
75 return custom_OnOrderEvent(bf_order_info->order_info, bf_order_info->error_info, bf_order_info->session_id);
76}
77
78// 主线程处理, API接口回调参数是指针, 需要做指针判空处理, 当触发该处理函数时一定是有有效数据的, 改成使用引用免去指针判空
79bool TraderXTP::custom_OnOrderEvent(const XTPOrderInfo &order_info, const XTPRI &error_info, uint64_t session_id) {
80 SPDLOG_DEBUG("XTPOrderInfo: {}", to_string(order_info));
81 SPDLOG_DEBUG("session_id: {}, XTPRI: {}", session_id, to_string(error_info));
82
83 // 所有容器访问不需要加锁
84 auto order_xtp_id_iter = map_xtp_to_kf_order_id_.find(order_info.order_xtp_id);
85 if (order_xtp_id_iter == map_xtp_to_kf_order_id_.end()) {
86 SPDLOG_WARN("unrecognized order_xtp_id {}@{}", order_info.order_xtp_id, trading_day_);
87 return generate_external_order(order_info);
88 }
89
90 uint64_t kf_order_id = order_xtp_id_iter->second;
91 if (not has_order(kf_order_id)) {
92 return generate_external_order(order_info);
93 }
94
95 auto &order_state = get_order(kf_order_id);
96 if (not is_final_status(order_state.data.status) or order_state.data.status == OrderStatus::Lost) {
97 from_xtp_no_price_type(order_info, order_state.data);
98 order_state.data.update_time = yijinjing::time::now_in_nano();
99 if (error_info.error_id != 0) {
100 order_state.data.error_id = error_info.error_id;
101 order_state.data.error_msg = error_info.error_msg;
102 }
103 try_write_to(order_state.data, order_state.dest);
104 SPDLOG_DEBUG("Order: {}", order_state.data.to_string());
105 try_deal_XTPTradeReport(order_info.order_xtp_id);
106 }
107 return true;
108}
109
110
111// xtp的子线程成交推送回调函数, 直接将数据写入共享内存, 不做任何业务处理
112void TraderXTP::OnTradeEvent(XTPTradeReport *trade_info, uint64_t session_id) {
113 if (nullptr == trade_info) {
114 SPDLOG_ERROR("XTPTradeReport is nullptr");
115 return;
116 }
117 SPDLOG_DEBUG("XTPTradeReport: {}", to_string(*trade_info));
118
119 auto &bf_trade_info = get_thread_writer()->open_custom_data<BufferXTPTradeReport>(kXTPTradeReportType, now());
120 memcpy(&bf_trade_info.trade_info, trade_info, sizeof(XTPTradeReport));
121 bf_trade_info.session_id = session_id;
122 SPDLOG_DEBUG("BufferXTPOrderInfo: {}", to_string(bf_trade_info));
123 get_thread_writer()->close_data();
124}
125
126// 主线程处理, 将自定义的struct读出, 对应的数据分解回原来的样子, 传给对应的Trade处理函数
127bool TraderXTP::custom_OnTradeEvent(const event_ptr &event) {
128 const auto *bf_trade_info = reinterpret_cast<const BufferXTPTradeReport *>(event->data_address());
129 return custom_OnTradeEvent(bf_trade_info->trade_info, bf_trade_info->session_id);
130}
131
132
133// 主线程处理, API接口回调参数是指针, 需要做指针判空处理, 当触发该处理函数时一定是有有效数据的, 改成使用引用免去指针判空
134bool TraderXTP::custom_OnTradeEvent(const XTPTradeReport &trade_info, uint64_t session_id) {
135 SPDLOG_DEBUG("XTPTradeReport: {}", to_string(trade_info));
136 SPDLOG_DEBUG("session_id: {}", session_id);
137
138 // 所有容器访问不需要加锁
139 auto order_xtp_id_iter = map_xtp_to_kf_order_id_.find(trade_info.order_xtp_id);
140 if (order_xtp_id_iter == map_xtp_to_kf_order_id_.end()) {
141 SPDLOG_WARN("unrecognized order_xtp_id {}, store in map_xtp_order_id_to_XTPTradeReports_", trade_info.order_xtp_id);
142 add_XTPTradeReport(trade_info);
143 return false;
144 }
145
146 if (has_dealt_trade(trade_info.order_xtp_id, trade_info.exec_id)) {
147 SPDLOG_DEBUG("order_xtp_id:{}, exec_id: {}, has dealt", trade_info.order_xtp_id, trade_info.exec_id);
148 return false;
149 }
150
151 uint64_t kf_order_id = order_xtp_id_iter->second;
152 if (not has_order(kf_order_id)) {
153 SPDLOG_ERROR("no order_id {} in orders_", kf_order_id);
154 return false;
155 }
156
157 add_dealt_trade(trade_info.order_xtp_id, trade_info.exec_id);
158 auto &order_state = get_order(kf_order_id);
159
160 if (has_writer(order_state.dest)) {
161 auto writer = get_writer(order_state.dest);
162 Trade &trade = writer->open_data<Trade>(now());
163 from_xtp(trade_info, trade);
164 trade.trade_id = writer->current_frame_uid();
165 trade.order_id = kf_order_id;
166 add_traded_volume(trade_info.order_xtp_id, trade.volume);
167 SPDLOG_DEBUG("Trade: {}", trade.to_string());
168 writer->close_data();
169 } else {
170 Trade trade{};
171 from_xtp(trade_info, trade);
172 trade.trade_id = get_public_writer()->current_frame_uid() xor (time::now_in_nano() & 0xFFFFFFFF);
173 trade.order_id = kf_order_id;
174 add_traded_volume(trade_info.order_xtp_id, trade.volume);
175 SPDLOG_DEBUG("Trade: {}", trade.to_string());
176 try_write_to(trade, order_state.dest);
177 }
178
179 if (not is_final_status(order_state.data.status) or order_state.data.status == OrderStatus::Lost) {
180 order_state.data.volume_left = std::min<int64_t>(
181 order_state.data.volume_left, order_state.data.volume - get_traded_volume(trade_info.order_xtp_id));
182 if (order_state.data.volume_left > 0) {
183 order_state.data.status = OrderStatus::PartialFilledActive;
184 }
185 order_state.data.update_time = now();
186 SPDLOG_DEBUG("Order: {}", order_state.data.to_string());
187 try_write_to(order_state.data, order_state.dest);
188 }
189 return true;
190}
Broker配置和启动流程
Broker配置
TD和MD的账户信息配置通过package.json来设置, 详情参考 package.json配置信息 一节.
通过前端界面配置的账户信息, 可以通过get_config()获取到一个json格式的字符串, 可以直接使用Kungfu引入的nlohmann json库进行解析, 使用方式有两种,
直接使用json对象通过key和value键值对获取前端输入的信息.
定义一个struct并且配置from_json函数, nlohmann json库可以直接将json格式的字符串转换成对应的struct对象.
建议使用第二种, 以下是xtp的范例
1// 自定义一个struct
2struct MDConfiguration {
3 int client_id;
4 std::string account_id;
5 std::string password;
6 std::string md_ip;
7 int md_port;
8 std::string protocol;
9 int buffer_size;
10 bool query_instruments;
11};
12
13// 定义如何从json转换成struct对象
14void from_json(const nlohmann::json &j, MDConfiguration &c) {
15 j.at("client_id").get_to(c.client_id);
16 j.at("account_id").get_to(c.account_id);
17 j.at("password").get_to(c.password);
18 j.at("md_ip").get_to(c.md_ip);
19 j.at("md_port").get_to(c.md_port);
20 c.protocol = j.value("protocol", "tcp");
21 if (c.protocol != "udp") {
22 c.protocol = "tcp";
23 }
24 c.buffer_size = j.value("buffer_size", 64);
25 c.query_instruments = j.value<bool>("query_instruments", false);
26}
27
28// 获取前端配置的参数信息
29void MarketDataXTP::on_start() {
30 MDConfiguration config = nlohmann::json::parse(get_config());
31 // 其他处理代码
32}
1{
2 "name": "@kungfu-trader/kfx-broker-xtp-demo",
3 "author": {
4 "name": "Kungfu Trader",
5 "email": "info@kungfu.link"
6 },
7 "version": "2.7.5-alpha.14",
8 "description": "Kungfu Extension - XTP Demo",
9 "license": "Apache-2.0",
10 "main": "package.json",
11 "repository": {
12 "url": "https://github.com/kungfu-trader/kungfu.git"
13 },
14 "publishConfig": {
15 "registry": "https://npm.pkg.github.com"
16 },
17 "binary": {
18 "module_name": "kfx-broker-xtp-demo",
19 "module_path": "dist/xtp",
20 "remote_path": "{module_name}/v{major}/v{version}",
21 "package_name": "{module_name}-v{version}-{platform}-{arch}-{configuration}.tar.gz",
22 "host": "https://prebuilt.libkungfu.cc"
23 },
24 "scripts": {
25 "build": "kfs extension build",
26 "clean": "kfs extension clean",
27 "format": "node ../../framework/core/.gyp/run-format-cpp.js src",
28 "install": "node -e \"require('@kungfu-trader/kungfu-core').prebuilt('install')\"",
29 "package": "kfs extension package"
30 },
31 "dependencies": {
32 "@kungfu-trader/kungfu-core": "^2.7.5-alpha.14"
33 },
34 "devDependencies": {
35 "@kungfu-trader/kungfu-sdk": "^2.7.5-alpha.14"
36 },
37 "kungfuDependencies": {
38 "xtp": "v2.2.37.4"
39 },
40 "kungfuBuild": {
41 "build_type": "Release",
42 "cpp": {
43 "target": "bind/python",
44 "links": {
45 "windows": [
46 "xtptraderapi",
47 "xtpquoteapi"
48 ],
49 "linux": [
50 "xtptraderapi",
51 "xtpquoteapi"
52 ],
53 "macos": [
54 "xtptraderapi",
55 "xtpquoteapi"
56 ]
57 }
58 }
59 },
60 "kungfuConfig": {
61 "key": "xtp",
62 "name": "XTP",
63 "language": {
64 "zh-CN": {
65 "account_name": "账户别名",
66 "account_name_tip": "请填写账户别名",
67 "account_id": "账户",
68 "account_id_tip": "请填写账户, 例如:1504091",
69 "password": "密码",
70 "password_tip": "请填写密码, 例如:123456",
71 "software_key": "软件密钥",
72 "software_key_tip": "请填写软件密钥, 例如:b8aa7173bba3470e390d787219b2112",
73 "td_ip": "交易IP",
74 "td_ip_tip": "请填写交易IP, 例如:61.152.102.111",
75 "td_port": "交易端口",
76 "td_port_tip": "请填写交易端口, 例如:8601",
77 "client_id": "客户ID",
78 "client_id_tip": "请填写自定义多点登录ID 1-99整数,不同节点之间不要重复",
79 "md_ip": "行情IP",
80 "md_ip_tip": "请填写行情IP, 例如:61.152.102.110",
81 "md_port": "行情端口",
82 "md_port_tip": "请填写行情端口, 例如:8602",
83 "protocol": "协议",
84 "protocol_tip": "请选择协议, tcp 或者 udp",
85 "buffer_size": "缓冲区大小",
86 "buffer_size_tip": "请填写 缓冲区大小(mb)",
87 "sync_external_order": "同步外部订单",
88 "sync_external_order_msg": "是否同步外部订单",
89 "sync_external_order_tip": "若开启则同步用户在其他交易软件的订单",
90 "recover_order_trade": "恢复订单",
91 "recover_order_trade_msg": "启动时是否查询恢复订单",
92 "recover_order_trade_tip": "若开启则启动时查询今日委托和成交",
93 "query_instruments": "查询可交易标的",
94 "query_instruments_tip": "是否查询可交易标的, 开启后会查询所有可交易标的, 流量太大频繁查询可能导致账号或ip被XTP拉黑"
95 },
96 "en-US": {
97 "account_name": "account name",
98 "account_name_tip": "Please enter account name",
99 "account_id": "account id",
100 "account_id_tip": "Please enter account id",
101 "password": "password",
102 "password_tip": "Please enter password, for example:123456",
103 "software_key": "software key",
104 "software_key_tip": "Please enter software key, for example :b8aa7173bba3470e390d787219b2112",
105 "td_ip": "td IP",
106 "td_ip_tip": "Please enter td IP, for example:61.152.102.111",
107 "td_port": "td port",
108 "td_port_tip": "Please enter td port, for example:8601",
109 "client_id": "client id",
110 "client_id_tip": "Please enter t user-defined multipoint client ID, which is an integer ranging from 1 to 99. The value must be unique on different nodes",
111 "md_ip": "md IP",
112 "md_ip_tip": "Please enter md IP, for example :61.152.102.110",
113 "md_port": "md port",
114 "md_port_tip": "Please enter md port, for example:8602",
115 "protocol": "protocol",
116 "protocol_tip": "Please select protocol, tcp or udp",
117 "buffer_size": "buffer size",
118 "buffer_size_tip": "Please enter buffer size(mb)",
119 "sync_external_order": "sync external order",
120 "sync_external_order_msg": "Whether open sync_external_order",
121 "sync_external_order_tip": "If enabled, it synchronizes users' orders in other trading software",
122 "recover_order_trade": "recover order trade",
123 "recover_order_trade_msg": "Whether recover order trade",
124 "recover_order_trade_tip": "If enabled, query order and trade when TD ready",
125 "query_instruments": "query instruments",
126 "query_instruments_tip": "If enabled, query instruments. too much infomation may result in account or ip blacklisted by XTP"
127 }
128 },
129 "config": {
130 "td": {
131 "type": [
132 "stock"
133 ],
134 "settings": [
135 {
136 "key": "account_name",
137 "name": "xtp.account_name",
138 "type": "str",
139 "tip": "xtp.account_name_tip"
140 },
141 {
142 "key": "account_id",
143 "name": "xtp.account_id",
144 "type": "str",
145 "required": true,
146 "primary": true,
147 "tip": "xtp.account_id_tip"
148 },
149 {
150 "key": "password",
151 "name": "xtp.password",
152 "type": "password",
153 "required": true,
154 "tip": "xtp.password_tip"
155 },
156 {
157 "key": "software_key",
158 "name": "xtp.software_key",
159 "type": "str",
160 "required": true,
161 "tip": "xtp.software_key_tip"
162 },
163 {
164 "key": "td_ip",
165 "name": "xtp.td_ip",
166 "type": "str",
167 "required": true,
168 "tip": "xtp.td_ip_tip"
169 },
170 {
171 "key": "td_port",
172 "name": "xtp.td_port",
173 "type": "int",
174 "required": true,
175 "tip": "xtp.td_port_tip"
176 },
177 {
178 "key": "client_id",
179 "name": "xtp.client_id",
180 "type": "int",
181 "required": true,
182 "tip": "xtp.client_id_tip"
183 },
184 {
185 "key": "sync_external_order",
186 "name": "xtp.sync_external_order",
187 "type": "bool",
188 "errMsg": "xtp.sync_external_order_msg",
189 "required": false,
190 "default": false,
191 "tip": "xtp.sync_external_order_tip"
192 },
193 {
194 "key": "recover_order_trade",
195 "name": "xtp.recover_order_trade",
196 "type": "bool",
197 "errMsg": "xtp.recover_order_trade_msg",
198 "required": false,
199 "default": false,
200 "tip": "xtp.recover_order_trade_tip"
201 }
202 ]
203 },
204 "md": {
205 "type": [
206 "stock"
207 ],
208 "settings": [
209 {
210 "key": "account_id",
211 "name": "xtp.account_id",
212 "type": "str",
213 "required": true,
214 "tip": "xtp.account_id_tip",
215 "default": "15011218"
216 },
217 {
218 "key": "password",
219 "name": "xtp.password",
220 "type": "password",
221 "required": true,
222 "tip": "xtp.password_tip",
223 "default": "PsVqy99v"
224 },
225 {
226 "key": "md_ip",
227 "name": "xtp.md_ip",
228 "type": "str",
229 "required": true,
230 "tip": "xtp.md_ip_tip",
231 "default": "119.3.103.38"
232 },
233 {
234 "key": "md_port",
235 "name": "xtp.md_port",
236 "type": "int",
237 "required": true,
238 "tip": "xtp.md_port_tip",
239 "default": 6002
240 },
241 {
242 "key": "protocol",
243 "name": "xtp.protocol",
244 "type": "select",
245 "options": [
246 {
247 "value": "tcp",
248 "label": "tcp"
249 },
250 {
251 "value": "udp",
252 "label": "udp"
253 }
254 ],
255 "required": false,
256 "tip": "xtp.protocol_tip",
257 "default": "tcp"
258 },
259 {
260 "key": "buffer_size",
261 "name": "xtp.buffer_size",
262 "type": "int",
263 "tip": "xtp.buffer_size_tip",
264 "required": false
265 },
266 {
267 "key": "client_id",
268 "name": "xtp.client_id",
269 "type": "int",
270 "required": true,
271 "tip": "xtp.client_id_tip",
272 "default": 23
273 },
274 {
275 "key": "query_instruments",
276 "name": "xtp.query_instruments",
277 "type": "bool",
278 "required": false,
279 "tip": "xtp.query_instruments_tip",
280 "default": false
281 }
282 ]
283 }
284 }
285 }
286}
Broker启动流程
柜台和行情源模块都继承 class BrokerService, 在启动进程时, 会先执行pre_start()做一些启动前的配置处理, 例如, 设置是否从恢复今日委托到内存, 创建Band信道等.
根据pre_start()的设置完成配置处理后, 执行on_start(), 这里一般都是设置柜台链接ip地址和端口, 以及账户密码进行连接.
当进程执行结束退出前, 会执行on_exit(), 这里一般是用于退出登录和内存释放.
1class BrokerService {
2 public:
3 typedef longfist::enums::BrokerState BrokerState;
4
5 explicit BrokerService(BrokerVendor &vendor);
6
7 virtual ~BrokerService() = default;
8
9 virtual void pre_start();
10
11 virtual void on_start();
12
13 virtual void on_exit();
14
15 // 其他属性
16};
TD启动流程
对于同步登陆接口, 在on_start中调用登陆接口后能立刻判断是否登录成功, 登陆成功后, 立即调用恢复Order和Trade的函数, 如果设置跳过了恢复今日委托, 则立即进入Ready状态; 否则在恢复完Order和Trade之后再进入Ready状态, 进入Ready状态后会立即执行req_position和req_account, 查询持仓和资金, 之后就可以进行委托报单操作.
对于异步登陆接口, 需要在登陆响应回调函数中才能知道是否登陆成功, 后续执行与同步登录接口同样的操作.
范例
1// xtp柜台在pre_start配置是否需要恢复今日委托
2void TraderXTP::pre_start() {
3 config_ = nlohmann::json::parse(get_config());
4 SPDLOG_INFO("config: {}", get_config());
5 if (not config_.recover_order_trade) {
6 disable_recover();
7 }
8}
9
10
11// xtp柜台在on_start进行登录
12void TraderXTP::on_start() {
13 if (config_.client_id < 1 or config_.client_id > 99) {
14 SPDLOG_ERROR("client_id must between 1 and 99");
15 }
16 std::string runtime_folder = get_runtime_folder();
17 SPDLOG_INFO("Connecting XTP account {} with tcp://{}:{}", config_.account_id, config_.td_ip, config_.td_port);
18 api_ = XTP::API::TraderApi::CreateTraderApi(config_.client_id, runtime_folder.c_str());
19 api_->RegisterSpi(this);
20 api_->SubscribePublicTopic(XTP_TERT_QUICK);
21 api_->SetSoftwareVersion("1.1.0");
22 api_->SetSoftwareKey(config_.software_key.c_str());
23 session_id_ = api_->Login(config_.td_ip.c_str(), config_.td_port, config_.account_id.c_str(),
24 config_.password.c_str(), XTP_PROTOCOL_TCP);
25 if (session_id_ > 0) {
26 SPDLOG_INFO("Login successfully");
27 req_order_trade(); // 登陆成功后恢复Order和Trade, 恢复完后将TD状态设置成Ready
28 } else {
29 update_broker_state(BrokerState::LoginFailed);
30 SPDLOG_ERROR("Login failed [{}]: {}", api_->GetApiLastError()->error_id, api_->GetApiLastError()->error_msg);
31 }
32}
33
34// 查询恢复Order和Trade
35void TraderXTP::req_order_trade() {
36 if (disable_recover_) {
37 return try_ready(); // 设置跳过恢复则立即设置成Ready
38 }
39
40 XTPQueryOrderReq query_order_param{};
41 int ret = api_->QueryOrders(&query_order_param, session_id_, get_request_id());
42 if (0 != ret) {
43 SPDLOG_ERROR("QueryOrders False: {}", ret);
44 }
45
46 XTPQueryTraderReq query_trade_param{};
47 ret = api_->QueryTrades(&query_trade_param, session_id_, get_request_id());
48 if (0 != ret) {
49 SPDLOG_ERROR("QueryTrades False : {}", ret);
50 }
51}
52
53// 将TD状态设置成Ready
54void TraderXTP::try_ready() {
55 if (BrokerState::Ready == get_state()) {
56 return;
57 }
58
59 SPDLOG_DEBUG("req_order_over_: {}, req_trade_over_: {}", req_order_over_, req_trade_over_);
60 if (disable_recover_ or (req_order_over_ and req_trade_over_)) {
61 update_broker_state(BrokerState::Ready);
62 }
63}
64
65// xtp柜台在on_exit里退出登录
66void TraderXTP::on_exit() {
67 if (api_ != nullptr and session_id_ > 0) {
68 auto result = api_->Logout(session_id_);
69 SPDLOG_INFO("Logout with return code {}", result);
70 }
71}
MD启动流程
MD的登录流程相对简单, 对于同步登陆接口, 直接在on_start里面将MD状态设置成Ready; 对于异步登陆接口, 在登陆响应接口直接将MD状态设置成Ready.
范例
1// xtp行情源在pre_start里申请创建两个页大小为256MB的band信道
2void MarketDataXTP::pre_start() {
3 entrust_band_uid_ = request_band("market-data-band-entrust", 256);
4 transaction_band_uid_ = request_band("market-data-band-transaction", 256);
5}
6
7// xtp行情源在on_start里进行登录
8void MarketDataXTP::on_start() {
9 MDConfiguration config = nlohmann::json::parse(get_config());
10 if (config.client_id < 1 or config.client_id > 99) {
11 SPDLOG_ERROR("client_id must between 1 and 99");
12 }
13 auto md_ip = config.md_ip.c_str();
14 auto account_id = config.account_id.c_str();
15 auto password = config.password.c_str();
16 auto protocol_type = get_xtp_protocol_type(config.protocol);
17 std::string runtime_folder = get_runtime_folder();
18 SPDLOG_INFO("Connecting XTP MD for {} at {}://{}:{}", account_id, config.protocol, md_ip, config.md_port);
19 api_ =
20 XTP::API::QuoteApi::CreateQuoteApi(config.client_id, runtime_folder.c_str(), XTP_LOG_LEVEL::XTP_LOG_LEVEL_INFO);
21 if (config.protocol == "udp") {
22 api_->SetUDPBufferSize(config.buffer_size);
23 }
24 api_->RegisterSpi(this);
25 if (api_->Login(md_ip, config.md_port, account_id, password, protocol_type) == 0) {
26 update_broker_state(BrokerState::LoggedIn);
27 update_broker_state(BrokerState::Ready);
28 SPDLOG_INFO("login success! (account_id) {}", config.account_id);
29 if (config.query_instruments and not check_if_stored_instruments(time::strfnow("%Y%m%d"))) {
30 api_->QueryAllTickers(XTP_EXCHANGE_SH);
31 api_->QueryAllTickers(XTP_EXCHANGE_SZ);
32 api_->QueryAllTickersFullInfo(XTP_EXCHANGE_SH);
33 api_->QueryAllTickersFullInfo(XTP_EXCHANGE_SZ);
34 }
35 } else {
36 update_broker_state(BrokerState::LoginFailed);
37 SPDLOG_ERROR("failed to login, [{}] {}", api_->GetApiLastError()->error_id, api_->GetApiLastError()->error_msg);
38 }
39}
交易柜台对接
交易接口基类
1class Trader : public BrokerService {
2public:
3 explicit Trader(BrokerVendor &vendor) : BrokerService(vendor){};
4
5 virtual longfist::enums::AccountType get_account_type() const = 0;
6
7 // 普通委托报单
8 virtual bool insert_order(const event_ptr &event) = 0;
9
10 // 预埋单报单
11 virtual bool insert_order_trigger(const event_ptr &event);
12
13 // 大宗交易报单
14 virtual bool insert_block_order(const event_ptr &event, const longfist::types::BlockMessage &block_message);
15
16 // 批量委托报单
17 virtual bool insert_batch_orders(const event_ptr &event, const OrderInputs &order_inputs);
18
19 // 算法单添加
20 virtual bool insert_algo_order(const event_ptr &event);
21
22 // 委托撤单(预埋撤单和普通撤单)
23 virtual bool cancel_order(const event_ptr &event) = 0;
24
25 // 预埋单撤单
26 virtual bool cancel_order_trigger(const event_ptr &event);
27
28 // 算法单删除
29 virtual bool cancel_algo_order(const event_ptr &event);
30
31 // 算法单启动/停止
32 virtual bool toggle_algo_order(const event_ptr &event);
33
34 // 查询持仓
35 virtual bool req_position() = 0;
36
37 // 查询资金
38 virtual bool req_account() = 0;
39
40 // 查询预埋单状态
41 virtual bool req_order_trigger();
42
43 // 两融合约查询
44 virtual bool req_contract();
45
46 // 查询算法单(系统未调用, 最后再确认)
47 virtual bool req_algo_order(const event_ptr &event);
48
49 // 查询历史委托
50 virtual bool req_history_order(const event_ptr &event);
51
52 // 查询历史成交
53 virtual bool req_history_trade(const event_ptr &event);
54
55 // 收到master广播的Band数据会触发该回调, 用于订阅band信道
56 virtual void on_band(const event_ptr &event) {}
57
58 // 收到一个TimeKeyValue数据时触发回调
59 virtual void on_time_key_value(const event_ptr &event) {}
60
61 // 收到一个策略的Deregister数据时触发回调, 表明Master检测到该进程已经退出
62 virtual bool on_strategy_exit(const event_ptr &event);
63
64 // TD启动时处理风控信息
65 void on_risk_setting(const longfist::types::RiskSetting &risk_setting);
66
67 // 获取该TD的location uname
68 [[nodiscard]] const std::string &get_account_id() const;
69
70 // 获取写入资金Asset的writer
71 [[nodiscard]] yijinjing::journal::writer_ptr get_asset_writer() const;
72
73 // 获取写入持仓Position的writer
74 [[nodiscard]] yijinjing::journal::writer_ptr get_position_writer() const;
75
76
77 // 启动时第一次获取到的资金Asset写到PUBLIC, 每分钟同步时需要写到SYNC, 调用该函数进行切换
78 void enable_asset_sync();
79
80 // 启动时第一次获取到的持仓Position写到PUBLIC, 每分钟同步时需要写到SYNC, 调用该函数进行切换
81 void enable_positions_sync();
82
83 [[nodiscard]] const OrderMap &get_orders() const;
84
85 [[nodiscard]] bool has_order(uint64_t order_id) const;
86
87 [[nodiscard]] kungfu::state<longfist::types::Order> &get_order(uint64_t order_id);
88
89 [[nodiscard]] const OrderActionMap &get_order_actions() const;
90
91 [[nodiscard]] bool has_order_action(uint64_t action_id) const;
92
93 [[nodiscard]] kungfu::state<longfist::types::OrderAction> &get_order_action(uint64_t action_id);
94
95 [[nodiscard]] const TradeMap &get_trades() const;
96
97 [[nodiscard]] const OrderTriggerMap &get_order_triggers() const;
98
99 [[nodiscard]] bool has_order_trigger(uint64_t trigger_id) const;
100
101 [[nodiscard]] kungfu::state<longfist::types::OrderTrigger> &get_order_trigger(uint64_t trigger_id);
102
103 [[nodiscard]] const OrderTriggerActionMap &get_order_trigger_actions() const;
104
105 [[nodiscard]] bool has_order_trigger_action(uint64_t action_id) const;
106
107 [[nodiscard]] kungfu::state<longfist::types::OrderTriggerAction> &get_order_trigger_action(uint64_t action_id);
108
109 [[nodiscard]] const AlgoOrderMap &get_algo_orders() const;
110
111 [[nodiscard]] bool has_algo_order(uint64_t algo_order_id) const;
112
113 [[nodiscard]] kungfu::state<longfist::types::AlgoOrder> &get_algo_order(uint64_t algo_order_id);
114
115 [[nodiscard]] const AlgoOrderActionMap &get_algo_order_actions() const;
116
117 [[nodiscard]] uint32_t get_risk_uid() const;
118
119 void disable_recover();
120
121 virtual void on_recover(){};
122
123 [[nodiscard]] bool is_sync_account() const { return sync_account_; }
124
125 void enable_sync_account() { sync_account_ = true; }
126
127 void disable_sync_account() { sync_account_ = false; }
128
129 void try_req_account();
130
131protected:
132 bool disable_recover_ = false;
133
134private:
135 bool sync_asset_ = false;
136 bool sync_position_ = false;
137 bool sync_account_ = false;
138 uint32_t risk_uid_ = 0;
139
140 void on_asset_sync();
141
142 void on_position_sync();
143
144 void recover();
145
146 void recover_from_journal();
147
148 void deal_write_frame();
149
150 void deal_read_frame();
151
152 [[nodiscard]] OrderService &get_order_service();
153
154 [[nodiscard]] const OrderService &get_order_service() const;
155
156 [[nodiscard]] OrderTriggerService &get_order_trigger_service();
157
158 [[nodiscard]] const OrderTriggerService &get_order_trigger_service() const;
159
160 [[nodiscard]] AlgoOrderService &get_algo_order_service();
161
162 [[nodiscard]] const AlgoOrderService &get_algo_order_service() const;
163};
主要回调接口
insert_order
普通委托报单
virtual bool insert_order(const event_ptr &event) = 0;
收到委托报单输入OrderInput时会调用该函数, 调用 event->data<OrderInput>() 获取委托内容, 将OrderInput中的待报单信息, 按照柜台api的要求直接填写或者构建所需的消息体后填写, 调用api完成报单发送.
报单完成后TD需要生成一个Order写回给报单的进程, 通知该笔委托的状态.
API的委托报单接口分为同步和异步两种,
- 同步报单: 调用完API下单接口后会立刻获取该委托在柜台服务器的委托编号
需要立刻建立 <API委托号, order_id> 映射关系, 在收到成交推送后根据此映射生成Trade和修改Order状态
- 异步报单: 调用完API下单接口后只能获取是否发送报单信息成功
在API的报单响应回调中才能获取该委托在柜台服务器的委托号, 在报单过程中一般会携带一个本地定义的request_id, 在报单响应中会有request_id和API委托号, 需要在下单时构建好 <request_id, order_id>的映射关系, 在报单响应中根绝request_id找到order_id, 再建立 <API委托号, order_id> 映射关系.
参数
参数 |
类型 |
说明 |
event |
const event_ptr & |
包含待报单信息以及来源和触发时间等信息 |
返回值
类型 |
说明 |
bool |
报单成功返回true, 报单失败返回false |
范例
1// 异步报单, ctp的insert_order实现
2bool TraderCTP::insert_order(const event_ptr &event) {
3 const OrderInput &input = event->data<OrderInput>();
4 SPDLOG_DEBUG("OrderInput: {}", input.to_string());
5
6 int request_id = get_request_id();
7 int error_id = 0;
8 uint64_t orderRef_key = 0;
9
10 CThostFtdcInputOrderField ctp_input{};
11 to_ctp(ctp_input, input);
12 strcpy(ctp_input.BrokerID, config_.broker_id.c_str());
13 strcpy(ctp_input.InvestorID, config_.account_id.c_str());
14 strcpy(ctp_input.OrderRef, std::to_string(++order_ref_).c_str());
15 SPDLOG_DEBUG("CThostFtdcInputOrderField: {}", to_string(ctp_input));
16 error_id = api_->ReqOrderInsert(&ctp_input, request_id);
17 orderRef_key = get_orderRef_key(front_id_, session_id_, ctp_input.OrderRef);
18
19 auto nano = time::now_in_nano();
20 auto writer = get_writer(event->source());
21 Order &order = writer->open_data<Order>(event->gen_time());
22 order_from_input(input, order);
23 order.insert_time = nano;
24 order.update_time = nano;
25 order.time_condition = from_ctp_time_condition(ctp_input.TimeCondition);
26
27 if (error_id != 0) {
28 order.error_id = error_id;
29 order.status = OrderStatus::Error;
30 }
31
32 SPDLOG_DEBUG("Order: {}", order.to_string());
33 writer->close_data();
34 map_request_id_to_kf_order_id_.insert_or_assign(request_id, uint64_t(input.order_id));
35 map_OrderRefKey_to_kf_order_id_.insert_or_assign(orderRef_key, uint64_t(input.order_id));
36 map_kf_order_id_to_OrderRef_.insert_or_assign(input.order_id, ctp_input.OrderRef);
37 return error_id == 0;
38}
39
40bool TraderCTP::custom_OnRtnOrder(const CThostFtdcOrderField &ctp_order) {
41 SPDLOG_DEBUG("CThostFtdcOrderField: {}", to_string(ctp_order));
42
43 const std::string str_ExchangeID_OrderLocalID =
44 make_ExchangeID_OrderSysID(ctp_order.ExchangeID, ctp_order.OrderLocalID);
45 const std::string str_ExchangeID_OrderSysID = make_ExchangeID_OrderSysID(ctp_order.ExchangeID, ctp_order.OrderSysID);
46 // orderRef_key 作为 request_id 中间桥梁的角色, 辅助链接 API委托和order_id
47 uint64_t orderRef_key = get_orderRef_key(ctp_order.FrontID, ctp_order.SessionID, ctp_order.OrderRef);
48 auto OrderRefKey_iter = map_OrderRefKey_to_kf_order_id_.find(orderRef_key);
49 auto ExchangeID_OrderSysID_iter = map_ExchangeID_OrderSysID_to_kf_order_id_.find(str_ExchangeID_OrderSysID);
50 auto ExchangeID_OrderLocalID_iter = map_ExchangeID_OrderSysID_to_kf_order_id_.find(str_ExchangeID_OrderLocalID);
51
52 // 系统外订单信息
53 if (OrderRefKey_iter == map_OrderRefKey_to_kf_order_id_.end() and
54 ExchangeID_OrderSysID_iter == map_ExchangeID_OrderSysID_to_kf_order_id_.end() and
55 ExchangeID_OrderLocalID_iter == map_ExchangeID_OrderSysID_to_kf_order_id_.end()) {
56 SPDLOG_WARN("external Order with ExchangeID: {}, OrderSysID: {}, OrderLocalID: {}", ctp_order.ExchangeID,
57 ctp_order.OrderSysID, ctp_order.OrderLocalID);
58 return generate_external_order(ctp_order);
59 }
60
61 uint64_t kf_order_id = 0;
62 if (strlen(ctp_order.OrderSysID) != 0 and
63 ExchangeID_OrderSysID_iter != map_ExchangeID_OrderSysID_to_kf_order_id_.end()) {
64 // 已经存在 <ExchangeID_OrderSysID, order_id>, 表示撤单或者是重新查询委托状态信息
65 kf_order_id = ExchangeID_OrderSysID_iter->second;
66 } else if (OrderRefKey_iter != map_OrderRefKey_to_kf_order_id_.end()) {
67 // 已经存在 <OrderRefKey, order_id>, 表示本次连接下的单
68 kf_order_id = OrderRefKey_iter->second;
69 } else if (ExchangeID_OrderLocalID_iter != map_ExchangeID_OrderSysID_to_kf_order_id_.end()) {
70 // 表示系统外委托的第二次推送, 第一次推送时只有 OrderLocalID 没有 OrderLocalID
71 kf_order_id = ExchangeID_OrderLocalID_iter->second;
72 } else {
73 SPDLOG_ERROR("invalidate CThostFtdcOrderField: {}", to_string(ctp_order));
74 return false;
75 }
76
77 // 该笔报单请求首次到达CTP, 风控通过后返回的第1个OnRtnOrder回报, 此时因为还没有报入到交易所,
78 // 所以回报中OrderSysID为空
79 if (strlen(ctp_order.OrderSysID) != 0) {
80 // 建立 <API委托号, order_id> 映射关系
81 map_kf_order_id_to_OrderSysID_.insert_or_assign(kf_order_id, ctp_order.OrderSysID);
82 map_ExchangeID_OrderSysID_to_kf_order_id_.insert_or_assign(str_ExchangeID_OrderSysID, kf_order_id);
83 }
84
85 if (not has_order(kf_order_id)) {
86 SPDLOG_WARN("no order_id {} in orders_", kf_order_id);
87 return generate_external_order(ctp_order);
88 }
89
90 auto &order_state = get_order(kf_order_id);
91
92 // 订单状态已经处于最终状态了就不再改动, 重连查询的时候会触发该情况
93 if (is_final_status(order_state.data.status) and order_state.data.status != longfist::enums::OrderStatus::Lost and
94 order_state.data.status != longfist::enums::OrderStatus::Cancelling) {
95 return true;
96 }
97
98 from_ctp(ctp_order, order_state.data);
99 order_state.data.update_time = time::now_in_nano();
100 if (has_writer(order_state.dest)) {
101 write_to(order_state.data, order_state.dest);
102 } else {
103 ++try_write_to_order_count;
104 try_write_to(order_state.data, order_state.dest, [&]() {
105 --try_write_to_order_count;
106 try_ready();
107 });
108 }
109 try_deal_trade(str_ExchangeID_OrderSysID);
110 return true;
111}
112
113
114// 同步报单, xtp的insert_order实现
115bool TraderXTP::insert_order(const event_ptr &event) {
116 const OrderInput &input = event->data<OrderInput>();
117 SPDLOG_DEBUG("OrderInput: {}", input.to_string());
118 XTPOrderInsertInfo xtp_input = {};
119 to_xtp(xtp_input, input);
120
121 SPDLOG_DEBUG("XTPOrderInsertInfo: {}", to_string(xtp_input));
122 uint64_t order_xtp_id = api_->InsertOrder(&xtp_input, session_id_);
123 auto success = order_xtp_id != 0;
124
125 auto nano = yijinjing::time::now_in_nano();
126 auto writer = get_writer(event->source());
127 Order &order = writer->open_data<Order>(event->gen_time());
128 order_from_input(input, order);
129 order.external_order_id = std::to_string(order_xtp_id).c_str();
130 order.insert_time = nano;
131 order.update_time = nano;
132
133 if (success) {
134 // 报单成功后直接返回 API委托号, 可以立刻建立 <API委托号, order_id> 映射关系
135 map_kf_to_xtp_order_id_.emplace(uint64_t(input.order_id), order_xtp_id);
136 map_xtp_to_kf_order_id_.emplace(order_xtp_id, uint64_t(input.order_id));
137 } else {
138 auto error_info = api_->GetApiLastError();
139 order.error_id = error_info->error_id;
140 order.error_msg = error_info->error_msg;
141 order.status = OrderStatus::Error;
142 }
143
144 SPDLOG_DEBUG("Order: {}", order.to_string());
145 writer->close_data();
146 if (not success) {
147 SPDLOG_ERROR("fail to insert order {}, error id {}, {}", to_string(xtp_input), (int)order.error_id,
148 order.error_msg);
149 }
150 return success;
151}
insert_order_trigger
预埋单下单报单
virtual bool insert_order_trigger(const event_ptr &event);
收到预埋单输入OrderTriggerInput时会调用该函数, 调用 event->data<OrderTriggerInput>() 获取预埋单内容.
将OrderTriggerInput中的待报预埋单信息, 按照柜台api的要求直接填写或者构建所需的消息体后填写, 调用api完成报预埋单发送.
报单完成后TD需要生成一个OrderTrigger写回给报单的进程, 通知预埋单的状态.
参数
参数 |
类型 |
说明 |
event |
const event_ptr & |
包含待报预埋单信息以及来源和触发时间等信息 |
返回值
类型 |
说明 |
bool |
预埋报单成功返回true, 预埋报单失败返回false |
范例
1// ctp的预埋下单为异步接口, 在响应接口才能获取到预埋单委托号 ParkedOrderID
2bool TraderCTP::insert_order_trigger(const event_ptr &event) {
3 const OrderTriggerInput &trigger_input = event->data<OrderTriggerInput>();
4 SPDLOG_DEBUG("OrderTriggerInput: {}", trigger_input.to_string());
5
6 int request_id = get_request_id();
7 int error_id = 0;
8 uint64_t orderRef_key = 0;
9
10 // 预埋单
11 CThostFtdcParkedOrderField ctp_parked_input{};
12 to_ctp_parked(ctp_parked_input, trigger_input);
13 strncpy(ctp_parked_input.GTDDate,
14 yijinjing::time::strftime(trigger_input.insert_time, KUNGFU_TRADING_DAY_FORMAT).c_str(), 9);
15 SPDLOG_DEBUG("insert_order ctp_parked_input. trigger.insert_time={}, GTDDate: {}", trigger_input.insert_time,
16 ctp_parked_input.GTDDate);
17 strcpy(ctp_parked_input.BrokerID, config_.broker_id.c_str());
18 strcpy(ctp_parked_input.InvestorID, config_.account_id.c_str());
19 strcpy(ctp_parked_input.OrderRef, std::to_string(++order_ref_).c_str());
20 ctp_parked_input.RequestID = request_id;
21 SPDLOG_DEBUG("CThostFtdcParkedOrderField: {}", to_string(ctp_parked_input));
22 error_id = api_->ReqParkedOrderInsert(&ctp_parked_input, request_id);
23 orderRef_key = get_orderRef_key(front_id_, session_id_, ctp_parked_input.OrderRef);
24
25 auto nano = time::now_in_nano();
26 auto writer = get_writer(event->source());
27 OrderTrigger &trigger = writer->open_data<OrderTrigger>(event->gen_time());
28 order_trigger_from_input(trigger_input, trigger);
29 trigger.insert_time = nano;
30 trigger.update_time = nano;
31 trigger.time_condition = from_ctp_time_condition(ctp_parked_input.TimeCondition);
32
33 if (error_id != 0) {
34 trigger.error_id = error_id;
35 trigger.status = OrderStatus::Error;
36 }
37
38 SPDLOG_DEBUG("OrderTrigger: {}", trigger.to_string());
39 writer->close_data();
40 map_request_id_to_kf_order_id_.insert_or_assign(request_id, uint64_t(trigger.trigger_id));
41 map_OrderRefKey_to_kf_order_id_.insert_or_assign(orderRef_key, uint64_t(trigger.trigger_id));
42 return error_id == 0;
43}
44
45// ctp预埋下单响应
46bool TraderCTP::custom_OnRspParkedOrderInsert(const CThostFtdcParkedOrderField &ParkedOrder,
47 const CThostFtdcRspInfoField &RspInfo, int nRequestID, bool bIsLast) {
48 SPDLOG_DEBUG("CThostFtdcParkedOrderField: {}", to_string(ParkedOrder));
49 SPDLOG_DEBUG("CThostFtdcRspInfoField: {}", to_string(RspInfo));
50 SPDLOG_DEBUG("nRequestID: {}, bIsLast: {}", nRequestID, bIsLast);
51
52 auto trigger_id_iter = map_request_id_to_kf_order_id_.find(nRequestID);
53 if (trigger_id_iter == map_request_id_to_kf_order_id_.end()) {
54 SPDLOG_ERROR("CANNOT FIND trigger_id of {} in map_request_id_to_kf_order_id_", nRequestID);
55 return false;
56 }
57
58 auto trigger_id = trigger_id_iter->second;
59 if (not has_order_trigger(trigger_id)) {
60 SPDLOG_ERROR("CANNOT FIND tigger_id {} in triggers_", trigger_id);
61 return false;
62 }
63
64 auto &trigger_state = get_order_trigger(trigger_id);
65 map_trigger_id_to_ParkedOrderID_.insert_or_assign(trigger_id,
66 std::pair<std::string, bool>{ParkedOrder.ParkedOrderID, false});
67 map_ParkedOrderId_to_trigger_id_.insert_or_assign(ParkedOrder.ParkedOrderID, trigger_id);
68 trigger_state.data.status = parked_status_to_trigger_status(ParkedOrder.Status);
69 strncpy(trigger_state.data.external_trigger_id, ParkedOrder.ParkedOrderID, strlen(ParkedOrder.ParkedOrderID));
70 trigger_state.data.error_id = RspInfo.ErrorID;
71 const std::string msg = gbk2utf8(RspInfo.ErrorMsg);
72 strncpy(trigger_state.data.error_msg, msg.c_str(), msg.length());
73 trigger_state.data.update_time = time::now_in_nano();
74
75 if (RspInfo.ErrorID != 0) {
76 trigger_state.data.status = OrderStatus::Error;
77 SPDLOG_ERROR("failed to insert order, ErrorId: {} ErrorMsg: {}, parked_order: {}", RspInfo.ErrorID,
78 gbk2utf8(RspInfo.ErrorMsg), to_string(ParkedOrder));
79 }
80
81 try_write_to(trigger_state.data, trigger_state.dest);
82 SPDLOG_DEBUG("OrderTrigger: {}", trigger_state.data.to_string());
83
84 return true;
85}
备注
预埋下单一般是在休盘时间报单, 委托信息挂在ctp服务器上, 当开盘时自动将委托信息提交到交易所.
在预埋单开盘触发前可以撤销, 但是开盘触发后, 不论下单是否成功, ctp都不会推送预埋单的状态信息, 需要手动查询才能获取预埋单的最新状态信息.
insert_block_order
大宗交易报单
virtual bool insert_block_order(const event_ptr &event, const longfist::types::BlockMessage &block_message);
大宗交易报单和普通委托报单类似, 委托信息存在event->data<OrderInput>()里, 区别是多了和大宗交易相关的额外信息BlockMessage, 按照柜台api的要求直接填写或者构建所需的消息体后填写, 调用api完成报大宗单发送.
报单完成后TD需要生成一个Order写回给报单的进程, 通知该笔委托的状态.
参数
参数 |
类型 |
说明 |
event |
const event_ptr & |
包含待报大宗单信息以及来源和触发时间等信息 |
block_message |
const longfist::types::BlockMessage & |
包含待报对手方席位号以及成交约定号等信息 |
返回值
类型 |
说明 |
bool |
大宗报单成功返回true, 大宗报单失败返回false |
范例
1// 金证股票柜台的大宗交易报单
2bool TraderMaCli::insert_block_order(const event_ptr &event, const longfist::types::BlockMessage &block_message) {
3 const OrderInput &order_input = event->data<OrderInput>();
4 SPDLOG_DEBUG("OrderInput Message : {}", order_input.to_string());
5
6 const int64_t request_id = get_request_id();
7 auto nano = kungfu::yijinjing::time::now_in_nano();
8 auto writer = get_writer(event->source());
9 Order &order = writer->open_data<Order>(event->gen_time());
10 order_from_input(order_input, order);
11 set_offset(order);
12 order.status = OrderStatus::Pending;
13 order.insert_time = nano;
14 order.update_time = nano;
15
16 map_request_to_order_.emplace(request_id, order.order_id);
17
18 CReqStkOrderField stField{};
19 auto iter_maStkUserLoginFiled = map_maszStkbd_to_maStkUserLoginFiled_.find(
20 kf_to_macli_exchange_id(order_input.exchange_id, stField.szStkbd));
21 if (iter_maStkUserLoginFiled == map_maszStkbd_to_maStkUserLoginFiled_.end()) {
22 const std::string msg = "找不到和交易板块" + std::string(stField.szStkbd) + "对应的用户下单信息, 无法下单";
23 SPDLOG_ERROR("找不到和交易板块{}对应的用户下单信息, 无法下单", stField.szStkbd);
24 order.status = OrderStatus::Error;
25 order.error_id = -1;
26 strncpy(order.error_msg, msg.c_str(), msg.length());
27 SPDLOG_DEBUG("Order: {}", order.to_string());
28 writer->close_data();
29 return false;
30 }
31 const CRspStkUserLoginField &login_filed = iter_maStkUserLoginFiled->second;
32 stField.llCuacctCode = login_filed.llCuacctCode;
33 stField.llCustCode = login_filed.llCustCode;
34 strncpy(stField.szTrdacct, login_filed.szStkTrdacct, 20);
35 strncpy(stField.szStkCode, order_input.instrument_id, 8);
36 strncpy(stField.szOrderPrice, std::to_string(order_input.limit_price).c_str(), 21);
37 stField.llOrderQty = order_input.volume / get_vol_multi(order_input.exchange_id,
38 get_instrument_type(order_input.exchange_id,
39 order_input.instrument_id));
40 if (order_input.instrument_type == InstrumentType::Repo) {
41 if (order_input.side != Side::Sell) {
42 order.status = OrderStatus::Error;
43 order.error_id = -1;
44 strncpy(order.error_msg, "逆回购只能以Sell方式下单", ERROR_MSG_LEN);
45 SPDLOG_DEBUG("Order: {}", order.to_string());
46 writer->close_data();
47 return false;
48 }
49 stField.iStkBiz = 165;
50 } else {
51 stField.iStkBiz = kf_to_macli_iStkBiz(order_input.side);
52 }
53 stField.iStkBizAction = kf_to_macli_iStkBizAction(order_input);
54 strncpy(stField.szClientInfo, cust_trace_info_.c_str(), 256);
55 stField.chSecurityLevel = '1';
56 int iOrderBsn = int(order_input.order_id & 0x0000FFFF);
57 stField.iOrderBsn = iOrderBsn;
58 stField.iCuacctSn = iCuacctSn_;
59
60 /// 大宗交易需要额外填写的信息
61 if (order_input.block_id != 0) {
62 stField.llMatchNo = block_message.match_number; // 成交约定号
63 std::string str_REDUCT = block_message.is_specific ? "REDUCT=1" : "REDUCT=0";
64 stField.iStkBiz = kf_to_macli_block_iStkBiz(order_input);
65 strncpy(stField.szOpptStkpbu, block_message.opponent_seat, 8);
66 strncpy(stField.szOrderText, str_REDUCT.c_str(), 256);
67 }
68
69 SPDLOG_DEBUG("CReqStkOrderField: {}", to_string(stField));
70 int ret = api_->ReqOrder(&stField, request_id);
71 SPDLOG_DEBUG("ReqOrder ret: {}", ret);
72 if (0 != ret) {
73 const std::string msg = gbk2utf8(api_->GetLastErrorText());
74 SPDLOG_ERROR("下单失败, return code: {}, error msg: {}", ret, msg);
75 order.status = OrderStatus::Error;
76 order.error_id = ret;
77 strncpy(order.error_msg, msg.c_str(), msg.length());
78 }
79
80 SPDLOG_DEBUG("Order: {}", order.to_string());
81 writer->close_data();
82 return 0 == ret;
83}
备注
大部分柜台的大宗交易报单和普通委托报单是同一个委托函数, 委托响应函数也相同, 区别只是是否填写了大宗交易相关的信息: 对手方席号, 成交约定号, 是否受限(特定)股份
insert_batch_orders
批量委托报单
virtual bool insert_batch_orders(const event_ptr &event, const OrderInputs &order_inputs);
有的柜台可能会有流控限制, 发送消息的数量有限制, 如果需要一次性报很多笔委托, 需要用到批量委托接口, 待报批量单信息将会存在order_inputs中, 按照柜台api的要求直接填写或者构建所需的消息体后填写, 调用api完成报批量单发送.
参数
参数 |
类型 |
说明 |
event |
const event_ptr & |
包含待报批量单来源和触发时间等信息 |
order_inputs |
const OrderInputs & |
包含待报批量单的信息 |
返回值
类型 |
说明 |
bool |
批量报单成功返回true, 批量报单失败返回false |
范例
1// 华锐股票柜台批量委托报单
2bool TraderAtp::insert_batch_orders(const event_ptr &event, const broker::OrderInputs &order_inputs) {
3 SPDLOG_DEBUG("insert_batch_orders");
4 auto writer = get_writer(event->source());
5 std::vector<uint64_t> orderids{};
6 ATPReqBatchCashAuctionOrderMsg msg{};
7 int64_t nano = time::now_in_nano();
8 OrderInput first_order = order_inputs.front();
9 for (const OrderInput &order_input : order_inputs) {
10 APIBatchCashAuctionOrderUnit unit{};
11 strcpy(unit.security_id, order_input.instrument_id);
12 kf_exchange_2_hr_market(order_input.exchange_id, unit.market_id);
13 kf_side_2_hr_side(order_input.side, unit.side);
14 unit.order_qty = ATPTradeAPI::DoubleExpandToInt(
15 double(order_input.volume) /
16 get_vol_multi(order_input.instrument_id, order_input.exchange_id, order_input.instrument_type),
17 2);
18 unit.price = ATPTradeAPI::DoubleExpandToInt(order_input.limit_price, 4);
19 kf_price_type_2_hr_order_type(order_input.price_type, unit.order_type);
20 kf_exchange_2_hr_market(order_input.exchange_id, msg.market_id);
21 msg.order_array.push_back(unit);
22 SPDLOG_DEBUG("APIBatchCashAuctionOrderUnit: {}", to_string(unit));
23
24 auto &order = writer->open_data<Order>(event->gen_time());
25 order_from_input(order_input, order);
26 order.insert_time = nano;
27 order.update_time = nano;
28 order.status = OrderStatus::Pending;
29 SPDLOG_DEBUG("Order: {}", order.to_string());
30 writer->close_data();
31 orderids.push_back(order.order_id);
32 }
33
34 strcpy(msg.security_id, first_order.instrument_id);
35 kf_exchange_2_hr_market(first_order.exchange_id,
36 msg.market_id);
37 msg.side = ATPSideConst::kBuy;
38 msg.order_qty = ATPTradeAPI::DoubleExpandToInt(100, 2);
39 msg.price = ATPTradeAPI::DoubleExpandToInt(10, 4);
40 msg.order_type = ATPOrdTypeConst::kOptimalFiveLevelFullDealTransferCancel;
41
42 strcpy(msg.cust_id, cust_id_.c_str());
43 strcpy(msg.fund_account_id, td_config_.fund_account_id.c_str());
44 strcpy(msg.branch_id, td_config_.branch_id.c_str());
45 msg.client_seq_id = get_client_seq_id();
46 msg.order_way = order_way_;
47 strcpy(msg.password, str_cipher_.c_str());
48 msg.client_feature_code = str_trace_info_;
49
50 switch (msg.market_id) {
51 case ATPMarketIDConst::kShangHai:
52 strcpy(msg.account_id, sh_account_id_.c_str());
53 break;
54 case ATPMarketIDConst::kShenZhen:
55 strcpy(msg.account_id, sz_account_id_.c_str());
56 break;
57 default:
58 SPDLOG_ERROR("Invalidated market_id : {}", msg.market_id);
59 }
60 msg.batch_type = ATPBatchTypeConst::kBatch;
61 // 将华锐批量成交的id与kongfu的批量orderid相对应, 先添加map信息, 防止回调回来过快找不到对应的order_ids
62 map_client_to_orderids_.emplace(msg.client_seq_id, orderids);
63 SPDLOG_DEBUG("ATPReqBatchCashAuctionOrderMsg: {}", to_string(msg));
64 int ret = atp_trader_api_ptr_->ReqBatchCashAuctionOrder(&msg);
65 SPDLOG_DEBUG("ReqBatchCashAuctionOrder return code : {} , return message : {}", ret,
66 map_error_code.try_emplace(ret).first->second);
67 if (ret != ErrorCode::kSuccess) {
68 SPDLOG_ERROR("FAILED TO INSERT ORDER !");
69 return false;
70 }
71 return true;
72}
73
74// 华锐股票柜台批量委托回调
75bool TraderAtp::custom_OnRspOrderStatusInternalAck(const BufferATPRspOrderStatusAckMsg &internal_ack) {
76 // 优先检测是否属于批量单
77 auto order_ids_iter = map_client_to_orderids_.find(internal_ack.client_seq_id);
78 if (order_ids_iter != map_client_to_orderids_.end()) {
79 std::vector<uint64_t> &orderids = order_ids_iter->second;
80 if (orderids.empty()) {
81 SPDLOG_ERROR("orderids of client_seq_id: {} is empty", internal_ack.client_seq_id);
82 return false;
83 }
84 uint64_t order_id = orderids.front();
85 map_kf_orderid_to_hr_cl_ord_no_.emplace(order_id, internal_ack.cl_ord_no);
86 map_hr_cl_ord_no_to_kf_orderid_.emplace(internal_ack.cl_ord_no, order_id);
87 orderids.erase(orderids.begin());
88
89 if (not has_order(order_id)) {
90 SPDLOG_WARN("order_id: {} not in orders_", order_id);
91 generate_real_time_external_order(internal_ack);
92 return false;
93 }
94
95 auto &order_state = get_order(order_id);
96
97 if (not is_final_status(order_state.data.status) or order_state.data.status == OrderStatus::Lost) {
98 hr_order_status_ack_2_kf_order(internal_ack, order_state.data);
99 // const std::string str_cl_ord_no = std::to_string(internal_ack.cl_ord_no);
100 // strncpy(order_state.data.external_order_id, str_cl_ord_no.c_str(), str_cl_ord_no.length());
101 if (order_state.data.status == OrderStatus::Error) {
102 order_state.data.error_id = internal_ack.reject_reason_code;
103 strncpy(order_state.data.error_msg,
104 map_error_code.try_emplace(internal_ack.reject_reason_code).first->second.c_str(), ERROR_MSG_LEN);
105 }
106 order_state.data.update_time = time::now_in_nano();
107 if (OrderStatus ::Pending != order_state.data.status and has_writer(order_state.dest)) {
108 get_writer(order_state.dest)->write(order_state.data.update_time, order_state.data);
109 } else {
110 SPDLOG_WARN("no writer of {}:{}", order_state.dest, get_vendor().get_location_uname(order_state.dest));
111 SPDLOG_WARN("batch Order: {}", order_state.data.to_string());
112 }
113 SPDLOG_DEBUG("batch Order:{}", order_state.data.to_string());
114 }
115 try_deal_TradeERMsgs_(internal_ack.cl_ord_no);
116 return false;
117 }
118 // 余下为普通委托处理代码
119}
cancel_order
普通撤单和预埋撤单
virtual bool cancel_order(const event_ptr &event) = 0;
event->data<OrderAction>()中有一个OrderActionFlag枚举值, 表明是普通撤单还是预埋撤单.
按照柜台api的要求直接填写或者构建所需的消息体后填写, 调用api完成撤单发送.
若撤单失败, 需要写一个OrderActionError给调用撤单的进程, 说明失败原因.
参数
参数 |
类型 |
说明 |
event |
const event_ptr & |
包含待撤单信息以及来源和触发时间等信息 |
返回值
类型 |
说明 |
bool |
撤单成功返回true, 撤单失败返回false |
范例
1// CTP撤单实现
2bool TraderCTP::cancel_order(const event_ptr &event) {
3 const OrderAction &action = event->data<OrderAction>();
4 SPDLOG_DEBUG("order_action: {}", action.to_string());
5 int request_id = get_request_id();
6 int error_id = 0;
7
8 auto fn_generate_error = [&](const std::string &msg) -> bool {
9 auto writer = get_writer(event->source());
10 OrderActionError &error = writer->open_data<OrderActionError>(event->gen_time());
11 error.error_id = -1;
12 error.order_id = action.order_id;
13 error.order_action_id = action.order_action_id;
14 strncpy(error.error_msg, msg.c_str(), msg.length());
15 SPDLOG_ERROR("OrderActionError: {}", error.to_string());
16 writer->close_data();
17 return false;
18 };
19
20 auto OrderSysID_iter = map_kf_order_id_to_OrderSysID_.find(action.order_id);
21 auto OrderRef_iter = map_kf_order_id_to_OrderRef_.find(action.order_id);
22 if (OrderSysID_iter == map_kf_order_id_to_OrderSysID_.end() and OrderRef_iter == map_kf_order_id_to_OrderRef_.end()) {
23 const std::string msg = fmt::format("FAILED TO CANCEL order {}, can't find related ctp order id", action.order_id);
24 return fn_generate_error(msg);
25 }
26
27 if (not has_order(action.order_id)) {
28 SPDLOG_ERROR("no kf_order_id {} in orders_", action.order_id);
29 const std::string msg = fmt::format("{} not in orders_, try again later!", action.order_id);
30 return fn_generate_error(msg);
31 }
32
33 auto &order_state = get_order(action.order_id);
34 map_request_id_to_kf_order_id_.insert_or_assign(request_id, action.order_id);
35 map_request_id_to_kf_action_id_.insert_or_assign(request_id, action.order_action_id);
36
37 // 普通撤单
38 if (action.action_flag == OrderActionFlag::Cancel) {
39 CThostFtdcInputOrderActionField ctp_action{};
40 strcpy(ctp_action.BrokerID, config_.broker_id.c_str());
41 strcpy(ctp_action.InvestorID, config_.account_id.c_str());
42 ctp_action.FrontID = front_id_;
43 ctp_action.SessionID = session_id_;
44 ctp_action.ActionFlag = THOST_FTDC_AF_Delete;
45 strcpy(ctp_action.InstrumentID, order_state.data.instrument_id);
46 strcpy(ctp_action.ExchangeID, order_state.data.exchange_id);
47 strcpy(ctp_action.OrderSysID, order_state.data.external_order_id);
48 if (OrderRef_iter != map_kf_order_id_to_OrderRef_.end()) {
49 strncpy(ctp_action.OrderRef, OrderRef_iter->second.c_str(), OrderRef_iter->second.length());
50 }
51 SPDLOG_DEBUG("CThostFtdcInputOrderActionField: {}", to_string(ctp_action));
52 error_id = api_->ReqOrderAction(&ctp_action, request_id);
53
54 if (not is_final_status(order_state.data.status)) {
55 order_state.data.status = OrderStatus::Cancelling;
56 try_write_to(order_state.data, order_state.dest);
57 }
58 SPDLOG_DEBUG("Order: {}", order_state.data.to_string());
59 return error_id == 0;
60 }
61
62 // 预埋撤单
63 if (action.action_flag == OrderActionFlag::TriggerCancel) {
64 CThostFtdcParkedOrderActionField ctp_parked_input_action{};
65 strcpy(ctp_parked_input_action.InstrumentID, order_state.data.instrument_id);
66 strcpy(ctp_parked_input_action.ExchangeID, order_state.data.exchange_id);
67 ctp_parked_input_action.ActionFlag = THOST_FTDC_AF_Delete;
68 strcpy(ctp_parked_input_action.BrokerID, config_.broker_id.c_str());
69 strcpy(ctp_parked_input_action.InvestorID, config_.account_id.c_str());
70 strcpy(ctp_parked_input_action.UserID, config_.account_id.c_str());
71 strcpy(ctp_parked_input_action.OrderSysID, order_state.data.external_order_id);
72 if (OrderRef_iter != map_kf_order_id_to_OrderRef_.end()) {
73 strncpy(ctp_parked_input_action.OrderRef, OrderRef_iter->second.c_str(), OrderRef_iter->second.length());
74 }
75 SPDLOG_DEBUG("CThostFtdcParkedOrderActionField: {}", to_string(ctp_parked_input_action));
76 error_id = api_->ReqParkedOrderAction(&ctp_parked_input_action, request_id);
77
78 auto nano = time::now_in_nano();
79 auto writer = get_writer(event->source());
80 OrderTrigger &trigger = writer->open_data<OrderTrigger>(event->gen_time());
81 trigger.trigger_id = action.order_action_id;
82 order_trigger_from_order(order_state.data, trigger);
83 trigger.insert_time = nano;
84 trigger.update_time = nano;
85
86 if (error_id != 0) {
87 trigger.error_id = error_id;
88 trigger.status = OrderStatus::Error;
89 }
90
91 SPDLOG_DEBUG("OrderTrigger: {}", trigger.to_string());
92 writer->close_data();
93
94 return error_id == 0;
95 }
96
97 SPDLOG_ERROR("Unrecognized action_flag: {}", action.action_flag);
98
99 return false;
100}
备注
普通撤单会触发委托推送, 和普通下单类似.
预埋撤单是在休盘时间提交, 开盘后立刻撤单, 同预埋下单一样, 触发前可以撤销, 触发后需要手动查询.
cancel_order_trigger
预埋单撤销
virtual bool cancel_order_trigger(const event_ptr &event);
根据event->data<OrderTriggerAction>()的trigger_id找到对应的预埋单委托号, 按照柜台api的要求直接填写或者构建所需的消息体后填写, 调用api完成撤单发送.
如果撤单失败, 需要写一个OrderTriggerActionError给调用报单的进程, 说明失败原因.
参数
参数 |
类型 |
说明 |
event |
const event_ptr & |
包含待撤预埋单信息以及来源和触发时间等信息 |
返回值
类型 |
说明 |
bool |
撤预埋单成功返回true, 撤预埋单失败返回false |
范例
1bool TraderCTP::cancel_order_trigger(const event_ptr &event) {
2 const OrderTriggerAction &action = event->data<OrderTriggerAction>();
3 SPDLOG_DEBUG("OrderTriggerAction: {}", action.to_string());
4
5 auto parked_id_iter = map_trigger_id_to_ParkedOrderID_.find(action.trigger_id);
6 if (parked_id_iter == map_trigger_id_to_ParkedOrderID_.end()) {
7 SPDLOG_ERROR("CANNOT FIND trigger_id {} in map_trigger_id_to_ParkedOrderID_", action.trigger_id);
8 return false;
9 }
10
11 const std::pair<std::string, bool> &str_parked_pair = parked_id_iter->second;
12 int request_id = get_request_id();
13 int error_id = 0;
14
15 auto fn_generate_error = [&](int32_t error_id, const std::string &msg) -> bool {
16 auto writer = get_writer(event->source());
17 OrderTriggerActionError &error = writer->open_data<OrderTriggerActionError>(event->gen_time());
18 error.error_id = error_id;
19 error.trigger_id = action.trigger_id;
20 error.order_trigger_action_id = action.order_trigger_action_id;
21 strncpy(error.external_trigger_id, str_parked_pair.first.c_str(), str_parked_pair.first.length());
22 strncpy(error.error_msg, msg.c_str(), msg.length());
23 error.insert_time = time::now_in_nano();
24 SPDLOG_ERROR("OrderTriggerActionError: {}", error.to_string());
25 writer->close_data();
26 return false;
27 };
28
29 // ParkedOrderId 是空的说明下预埋单的时候是失败的, 响应函数返回的ParkedOrderId是空
30 if (str_parked_pair.first.empty()) {
31 return fn_generate_error(-1, "待删除的OrderTrigger没有对应的ParkedId");
32 }
33
34 if (str_parked_pair.second) {
35 // 删除预埋撤单
36 CThostFtdcRemoveParkedOrderActionField ctp_remove_parked_order_action{};
37 strcpy(ctp_remove_parked_order_action.BrokerID, config_.broker_id.c_str());
38 strcpy(ctp_remove_parked_order_action.InvestorID, config_.account_id.c_str());
39 strcpy(ctp_remove_parked_order_action.InvestUnitID, config_.account_id.c_str());
40 strcpy(ctp_remove_parked_order_action.ParkedOrderActionID, str_parked_pair.first.c_str()); /// 预埋撤单编号
41 SPDLOG_DEBUG("CThostFtdcRemoveParkedOrderActionField: {}", to_string(ctp_remove_parked_order_action));
42 error_id = api_->ReqRemoveParkedOrderAction(&ctp_remove_parked_order_action, request_id);
43 } else {
44 // 删除预埋下单
45 CThostFtdcRemoveParkedOrderField ctp_remove_parked_order{};
46 strcpy(ctp_remove_parked_order.BrokerID, config_.broker_id.c_str());
47 strcpy(ctp_remove_parked_order.InvestorID, config_.account_id.c_str());
48 strcpy(ctp_remove_parked_order.InvestUnitID, config_.account_id.c_str());
49 strcpy(ctp_remove_parked_order.ParkedOrderID, str_parked_pair.first.c_str());
50 SPDLOG_DEBUG("CThostFtdcRemoveParkedOrderField: {}", to_string(ctp_remove_parked_order));
51 error_id = api_->ReqRemoveParkedOrder(&ctp_remove_parked_order, request_id);
52 }
53
54 if (error_id != 0) {
55 return fn_generate_error(error_id, "删除预埋单出错");
56 }
57
58 auto &trigger_state = get_order_trigger(action.trigger_id);
59 trigger_state.data.update_time = time::now_in_nano();
60 trigger_state.data.status = OrderStatus::Cancelling;
61 try_write_to(trigger_state.data, trigger_state.dest);
62
63 return true;
64}
备注
撤销预埋单成功之后, 需要手动查询预埋单才能获取预埋单的最新状态.
req_position
查询持仓
virtual bool req_position() = 0;
向券商柜台查询账户当前持仓, TD进程进入就绪状态是会触发该函数, 同时系统也会每分钟自动触发一次查询以同步最新的持仓状态.
将全局存储的账户等信息, 按照柜台api的要求直接填写或者构建所需的消息体后填写, 调用api完成持仓查询发送.
有的柜台支持同步查询和异步查询, 有的柜台只支持同步查询.
参数
参数 |
类型 |
说明 |
无 |
无 |
无 |
返回值
类型 |
说明 |
bool |
查询持仓成功返回true, 查询持仓失败返回false |
范例
1// xtp 异步查询持仓
2bool TraderXTP::req_position() {
3 SPDLOG_INFO("req_position");
4 return api_->QueryPosition(nullptr, session_id_, get_request_id()) == 0;
5}
6
7bool TraderXTP::custom_OnQueryPosition(const XTPQueryStkPositionRsp &position, const XTPRI &error_info, int request_id,
8 bool is_last, uint64_t session_id) {
9 if (error_info.error_id != 0) {
10 SPDLOG_ERROR("error_id:{}, error_msg: {}, request_id: {}, last: {}", error_info.error_id, error_info.error_msg,
11 request_id, is_last);
12 return false;
13 }
14
15 SPDLOG_TRACE("XTPQueryStkPositionRsp: {}", to_string(position));
16 auto writer = get_position_writer(); // 第一次获取writer是写入到PUBLIC
17 Position &stock_pos = writer->open_data<Position>(0);
18 from_xtp(position, stock_pos);
19 stock_pos.holder_uid = get_home_uid();
20 stock_pos.source_id = get_home_uid();
21 stock_pos.instrument_type = get_instrument_type(stock_pos.exchange_id, stock_pos.instrument_id);
22 stock_pos.direction = Direction::Long;
23 stock_pos.update_time = yijinjing::time::now_in_nano();
24 SPDLOG_TRACE("Position: {}", stock_pos.to_string());
25 writer->close_data();
26 if (is_last) {
27 PositionEnd &end = writer->open_data<PositionEnd>(0);
28 end.holder_uid = get_home_uid();
29 writer->close_data();
30 enable_positions_sync(); // 调用该函数后, 之后每分钟同步一次的查询获取到的writer是写入到SYNC
31 }
32 return true;
33}
34
35
36// 顶点股票柜台只支持同步查询
37bool TraderItp::req_position() {
38 long nRet = 1;
39 bool update = false;
40 int64 idx = 0;
41 auto writer = get_position_writer();
42 while (nRet > 0) {
43 vector<ITPDK_ZQGL> arZqgl;
44 arZqgl.reserve(200); // 需要预分配足够空间,查询结果最大返回200条
45 nRet =
46 (long)SECITPDK_QueryPositions(get_account_id().c_str(), SORT_TYPE_AES,
47 200, idx, "", "", "", 1, arZqgl);
48 SPDLOG_TRACE("req_position success. Num of results {}", nRet);
49 for (auto &itr : arZqgl) {
50 idx = itr.BrowIndex;
51 if (get_account_id() == itr.AccountId) {
52 update = true;
53 auto &pos = writer->open_data<Position>(0);
54 // SPDLOG_DEBUG("exchange {} instrumentid {}", itr.Market,
55 // itr.StockCode);
56 from_itp(itr, pos);
57 pos.holder_uid = get_home_uid();
58 pos.source_op_id = get_home_uid();
59 pos.source_id = get_home_uid();
60 pos.ledger_category = LedgerCategory::Account;
61 writer->close_data();
62 SPDLOG_TRACE("req_position: {}", pos.to_string());
63 SPDLOG_TRACE("req_position StockName:{} -- "
64 "AccountId:{}; StockCode:{}; CurrentQty:{}; BrowIndex:{}; "
65 "DiluteCostPrice:{}; AvgBuyPrice: {}; "
66 "CostBalance: {}",
67 gbk2utf8(itr.StockName), itr.AccountId, itr.StockCode,
68 (long)itr.CurrentQty, (long)itr.BrowIndex,
69 itr.DiluteCostPrice, itr.AvgBuyPrice, itr.CostBalance);
70 }
71 }
72 idx++;
73 }
74 if (update) {
75 auto &end = writer->open_data<PositionEnd>(now());
76 end.holder_uid = get_home_uid();
77 writer->close_data();
78 enable_positions_sync();
79 }
80 return true;
81}
req_account
查询资金
virtual bool req_account() = 0;
向券商柜台查询账户当前资金, TD进程进入就绪状态是会触发该函数, 同时系统也会每分钟自动触发一次查询, 每7秒内如果有成交也会触发一次查询, 以同步最新的资金.
将全局存储的账户等信息, 按照柜台api的要求直接填写或者构建所需的消息体后填写, 调用api完成持仓查询发送.
有的柜台支持同步查询和异步查询, 有的柜台只支持同步查询.
参数
参数 |
类型 |
说明 |
无 |
无 |
无 |
返回值
类型 |
说明 |
bool |
查询资金成功返回true, 查询资金失败返回false |
范例
1// xtp 异步查询资金
2bool TraderXTP::req_account() {
3 SPDLOG_INFO("req_account");
4 return api_->QueryAsset(session_id_, get_request_id()) == 0;
5}
6
7bool TraderXTP::custom_OnQueryAsset(const XTPQueryAssetRsp &asset, const XTPRI &error_info, int request_id,
8 bool is_last, uint64_t session_id) {
9 if (error_info.error_id != 0) {
10 SPDLOG_ERROR("error_id: {}, error_msg: {}, request_id: {}, last: {}", error_info.error_id, error_info.error_msg,
11 request_id, is_last);
12 }
13
14 if (error_info.error_id == 0 || error_info.error_id == 11000350) {
15 SPDLOG_TRACE("OnQueryAsset: {}", to_string(asset));
16 auto writer = get_asset_writer();
17 Asset &account = writer->open_data<Asset>(0);
18 if (error_info.error_id == 0) {
19 from_xtp(asset, account);
20 }
21 account.holder_uid = get_live_home_uid();
22 account.update_time = yijinjing::time::now_in_nano();
23 SPDLOG_TRACE("Asset: {}", account.to_string());
24 writer->close_data();
25 enable_asset_sync();
26 }
27 return true;
28}
29
30// 顶点股票柜台只支持同步查询
31bool TraderItp::req_account() {
32 vector<ITPDK_ZJZH> arZjzh;
33 arZjzh.reserve(5);
34 long nRet = (long)SECITPDK_QueryFundInfo(get_account_id().c_str(), arZjzh);
35 if (nRet < 0) // 查询失败
36 {
37 string msg = SECITPDK_GetLastError();
38 SPDLOG_ERROR("req_account failed. Msg: {}", gbk2utf8(msg));
39 return false;
40 } else {
41 SPDLOG_TRACE("req_account success. Num of results {}", nRet);
42 for (auto &itr : arZjzh) {
43 SPDLOG_TRACE("FundAccount:{};AccountId:{} -- "
44 "MoneyType:{};FundAvl:{};FrozenBalance:{};TotalAsset:{};"
45 "MarketValue:{}",
46 itr.FundAccount, itr.AccountId, itr.MoneyType, itr.FundAvl,
47 itr.FrozenBalance, itr.TotalAsset, itr.MarketValue);
48 if (strcmp(itr.AccountId, get_account_id().c_str()) ==
49 0 /* && strcmp(itr.FundAccount, fund_id_.c_str()) == 0*/) {
50 auto writer = get_asset_writer();
51 auto &asset = writer->open_data<Asset>(0);
52 memset(&asset, 0, sizeof(Asset));
53 asset.update_time = yijinjing::time::now_in_nano();
54 asset.avail = itr.FundAvl;
55 asset.frozen_cash = itr.FrozenBalance;
56 asset.holder_uid = get_home_uid();
57 asset.ledger_category = LedgerCategory::Account;
58 writer->close_data();
59 enable_asset_sync();
60 return true;
61 }
62 }
63 }
64 return false;
65}
req_history_order
查询历史委托
virtual bool req_history_order(const event_ptr &event);
向券商柜台查询账户历史委托, 以下两个场景需要调用柜台API的查询历史接口
TD重启后恢复今日委托的最新状态, 更新在关闭期间的委托状态变化到本地
策略想要获取今日的所有委托信息, 以HistoryOrder的数据格式进行推送
该接口为策略主动查询账户当日历史委托情况, 将全局存储的账户等信息, 按照柜台api的要求直接填写或者构建所需的消息体后填写, 调用api完成历史委托查询发送.
交易柜台通常只能查询到当前交易日内的历史委托情况, 有的柜台支持同步查询和异步查询, 有的柜台只支持同步查询.
参数
参数 |
类型 |
说明 |
event |
const event_ptr & |
包含该查询操作来源等信息 |
返回值
类型 |
说明 |
bool |
查询历史委托成功返回true, 查询历史委托失败返回false |
范例
1// xtp 异步查询历史委托
2bool TraderXTP::req_history_order(const event_ptr &event) {
3 XTPQueryOrderReq query_param{};
4 int request_id = get_request_id();
5 int ret = api_->QueryOrders(&query_param, session_id_, request_id);
6 if (0 != ret) {
7 SPDLOG_ERROR("QueryOrders False: {}", ret);
8 }
9 map_request_location_.emplace(request_id, event->source()); // 标记为策略查询历史委托
10 return 0 == ret;
11}
12
13bool TraderXTP::custom_OnQueryOrder(const XTPOrderInfo &order_info, const XTPRI &error_info, int request_id,
14 bool is_last, uint64_t session_id) {
15 SPDLOG_DEBUG("XTPOrderInfo: {}", to_string(order_info));
16 SPDLOG_DEBUG("XTPRI: {}", to_string(error_info));
17 SPDLOG_DEBUG("request_id: {}, is_last: {}", request_id, is_last);
18
19 if (order_info.order_xtp_id == 0 and is_last and
20 map_request_location_.find(request_id) != map_request_location_.end()) {
21 SPDLOG_WARN("XTPQueryOrderRsp* order_info == nullptr, no data returned!");
22 auto writer = get_history_writer(request_id);
23 HistoryOrder &history_order = writer->open_data<HistoryOrder>();
24 history_order.is_last = true;
25 history_order.data_type = HistoryDataType::TotalEnd;
26 const std::string msg = "No order today";
27 history_order.error_msg = msg.c_str();
28 writer->close_data();
29 SPDLOG_DEBUG("HistoryOrder: {}", history_order.to_string());
30 return false;
31 }
32
33 // 此处判断是属于启动TD时恢复查询还是策略查询
34 if (map_request_location_.find(request_id) == map_request_location_.end()) {
35 // TD重连收到推送当做普通下单委托响应处理
36 if (is_last) {
37 req_order_over_ = true;
38 try_ready();
39 }
40 return order_info.order_xtp_id != 0 and custom_OnOrderEvent(order_info, error_info, request_id);
41 }
42
43 auto writer = get_history_writer(request_id);
44 HistoryOrder &history_order = writer->open_data<HistoryOrder>();
45
46 if (error_info.error_id != 0) {
47 SPDLOG_ERROR("OnQueryOrder False , error_code : {}, error_msg : {}", error_info.error_id, error_info.error_msg);
48 history_order.error_id = error_info.error_id;
49 history_order.error_msg = error_info.error_msg;
50 }
51
52 from_xtp(order_info, history_order);
53 history_order.order_id = writer->current_frame_uid();
54 history_order.is_last = is_last;
55 history_order.insert_time = yijinjing::time::now_in_nano();
56 history_order.update_time = history_order.insert_time;
57 SPDLOG_DEBUG("HistoryOrder: {}", history_order.to_string());
58 writer->close_data();
59 return true;
60}
61
62
63// xtp在启动时查询今日委托和成交信息, 没有标记 map_request_location_, custom_OnQueryOrder会走到custom_OnOrderEvent作为普通委托处理
64void TraderXTP::req_order_trade() {
65 if (disable_recover_) {
66 return try_ready();
67 }
68
69 XTPQueryOrderReq query_order_param{};
70 int ret = api_->QueryOrders(&query_order_param, session_id_, get_request_id());
71 if (0 != ret) {
72 SPDLOG_ERROR("QueryOrders False: {}", ret);
73 }
74
75 XTPQueryTraderReq query_trade_param{};
76 ret = api_->QueryTrades(&query_trade_param, session_id_, get_request_id());
77 if (0 != ret) {
78 SPDLOG_ERROR("QueryTrades False : {}", ret);
79 }
80}
81
82
83// 顶点股票柜台同步查询历史委托
84bool TraderItp::req_history_order(const event_ptr &event) {
85 SPDLOG_INFO("req_history_order");
86 const RequestHistoryOrder &request = event->data<RequestHistoryOrder>();
87 SPDLOG_INFO("RequestHistoryOrder: {}", request.to_string());
88 uint32_t source = event->source();
89 uint32_t limit = request.query_num;
90
91 auto writer = get_writer(event->source());
92 if (req_history_order_query_.find(source) == req_history_order_query_.end()) {
93 long nRet = 1;
94 std::vector<ITPDK_DRWT> his_orders;
95 req_history_order_query_.insert(std::make_pair(source, his_orders));
96 int64 idx = 0;
97 while (nRet > 0) {
98 vector<ITPDK_DRWT> arDrwt;
99 arDrwt.reserve(200); // 需要预分配足够空间,查询结果最大返回200条
100 nRet =
101 (long)SECITPDK_QueryOrders(get_account_id().c_str(), 0, SORT_TYPE_AES,
102 200, idx, "", "", 0, arDrwt);
103 req_history_order_query_[source].insert(
104 req_history_order_query_[source].end(), arDrwt.begin(), arDrwt.end());
105 idx = arDrwt.back().BrowIndex + 1;
106
107 if (nRet < 0) {
108 std::string error_msg = gbk2utf8(SECITPDK_GetLastError());
109 SPDLOG_ERROR("req_history_order ERROR! SECITPDK_QueryOrders return: {} "
110 ", error message : {}",
111 nRet, error_msg);
112
113 RequestHistoryOrderError &error =
114 writer->open_data<RequestHistoryOrderError>(event->gen_time());
115 error.error_id = nRet;
116 strncpy(error.error_msg, error_msg.c_str(), ERROR_MSG_LEN);
117 error.trigger_time = event->gen_time();
118 writer->close_data();
119 return false;
120 }
121 }
122 size_t ret_size = req_history_order_query_[source].size();
123 SPDLOG_INFO("req_history_order success. Num of results {}", ret_size);
124 }
125 auto it = req_history_order_query_[source].begin();
126 while (it != req_history_order_query_[source].end() && limit > 0) {
127 HistoryOrder &history_order = writer->open_data<HistoryOrder>(now());
128 from_itp(*it, history_order, trading_day_);
129 history_order.order_id = writer->current_frame_uid();
130 limit--;
131 it++;
132 if (it == req_history_order_query_[source].end()) {
133 history_order.data_type = HistoryDataType::TotalEnd;
134 } else if (limit == 0) {
135 history_order.data_type = HistoryDataType::PageEnd;
136 } else {
137 history_order.data_type = HistoryDataType::Normal;
138 }
139 writer->close_data();
140 }
141 if (it == req_history_order_query_[source].end()) {
142 req_history_order_query_.erase(source);
143 } else {
144 req_history_order_query_[source].erase(
145 req_history_order_query_[source].begin(),
146 req_history_order_query_[source].begin() + request.query_num);
147 }
148 return true;
149}
req_history_trade
查询历史成交
virtual bool req_history_trade(const event_ptr &event);
向券商柜台查询账户历史成交, 以下两个场景需要调用柜台API的查询历史接口
TD重启后恢复今日成交, 更新在关闭期间的成交到本地
策略想要获取今日的所有成交信息, 以HistoryTrade的数据格式进行推送
该接口为策略主动查询账户当日历史成交情况, 将全局存储的账户等信息, 按照柜台api的要求直接填写或者构建所需的消息体后填写, 调用api完成历史成交查询发送.
交易柜台通常只能查询到当前交易日内的历史成交情况, 有的柜台支持同步查询和异步查询, 有的柜台只支持同步查询.
参数
参数 |
类型 |
说明 |
event |
const event_ptr & |
包含该查询操作来源等信息 |
返回值
类型 |
说明 |
bool |
查询历史成交成功返回true, 查询历史成交失败返回false |
范例
1//
2bool TraderXTP::req_history_trade(const event_ptr &event) {
3 XTPQueryTraderReq query_param{};
4 int request_id = get_request_id();
5 int ret = api_->QueryTrades(&query_param, session_id_, request_id);
6 if (0 != ret) {
7 SPDLOG_ERROR("QueryTrades False : {}", ret);
8 }
9 map_request_location_.emplace(request_id, event->source());
10 return 0 == ret;
11}
12
13bool TraderXTP::custom_OnQueryTrade(const XTPTradeReport &trade_info, const XTPRI &error_info, int request_id,
14 bool is_last, uint64_t session_id) {
15 SPDLOG_DEBUG("XTPTradeReport: {}", to_string(trade_info));
16 SPDLOG_DEBUG("XTPRI: {}", to_string(error_info));
17 SPDLOG_DEBUG("request_id: {}, is_last: {}", request_id, is_last);
18
19 // 查询历史流水收到nullptr, 经过journal走一圈后表现形式为 order_xtp_id == 0
20 if (trade_info.order_xtp_id == 0 and is_last and
21 map_request_location_.find(request_id) != map_request_location_.end()) {
22 SPDLOG_WARN("XTPQueryTradeRsp* trade_info == nullptr, no data returned!");
23 auto writer = get_history_writer(request_id);
24 HistoryTrade &history_trade = writer->open_data<HistoryTrade>(now());
25 history_trade.is_last = true;
26 history_trade.data_type = HistoryDataType::TotalEnd;
27 const std::string msg = "No trade today";
28 history_trade.error_msg = msg.c_str();
29 writer->close_data();
30 return false;
31 }
32
33 if (map_request_location_.find(request_id) == map_request_location_.end()) {
34 // TD重连收到推送当做普通交易成交回报推送处理
35 if (is_last) {
36 req_trade_over_ = true;
37 try_ready();
38 }
39 return trade_info.order_xtp_id != 0 and custom_OnTradeEvent(trade_info, session_id);
40 }
41
42 auto writer = get_history_writer(request_id);
43 HistoryTrade &history_trade = writer->open_data<HistoryTrade>(now());
44
45 if (error_info.error_id != 0) {
46 SPDLOG_ERROR("OnQueryTrade False , error_code : {}, error_msg : {}", error_info.error_id, error_info.error_msg);
47 history_trade.error_id = error_info.error_id;
48 history_trade.error_msg = error_info.error_msg;
49 }
50
51 from_xtp(trade_info, history_trade);
52 history_trade.trade_id = writer->current_frame_uid();
53 history_trade.is_last = is_last;
54 history_trade.trade_time = yijinjing::time::now_in_nano();
55 history_trade.instrument_type = get_instrument_type(history_trade.exchange_id, history_trade.instrument_id);
56 SPDLOG_DEBUG("HistoryTrade: {}", history_trade.to_string());
57 writer->close_data();
58 return false;
59}
60
61
62// 顶点股票柜台同步查询历史委托
63bool TraderItp::req_history_trade(const event_ptr &event) {
64 SPDLOG_INFO("req_history_trade");
65 const RequestHistoryTrade &request = event->data<RequestHistoryTrade>();
66 SPDLOG_INFO("RequestHistoryTrade: {}", request.to_string());
67 uint32_t source = event->source();
68 uint32_t limit = request.query_num;
69
70 auto writer = get_writer(event->source());
71 if (req_history_trade_query_.find(source) == req_history_trade_query_.end()) {
72 long nRet = 1;
73 std::vector<ITPDK_SSCJ> his_trades;
74 req_history_trade_query_.insert(std::make_pair(source, his_trades));
75 int64 idx = 0;
76 while (nRet > 0) {
77 vector<ITPDK_SSCJ> arSscj;
78 arSscj.reserve(200); // 需要预分配足够空间,查询结果最大返回200条
79 nRet =
80 (long)SECITPDK_QueryMatchs(get_account_id().c_str(), 0, SORT_TYPE_AES,
81 200, idx, "", "", 0, arSscj);
82 req_history_trade_query_[source].insert(
83 req_history_trade_query_[source].end(), arSscj.begin(), arSscj.end());
84 idx = arSscj.back().BrowIndex + 1;
85
86 if (nRet < 0) {
87 std::string error_msg = gbk2utf8(SECITPDK_GetLastError());
88 SPDLOG_ERROR("req_history_trade ERROR! SECITPDK_QueryMatchs return: {} "
89 ", error message : {}",
90 nRet, error_msg);
91 RequestHistoryTradeError &error =
92 writer->open_data<RequestHistoryTradeError>(event->gen_time());
93 error.error_id = nRet;
94 strncpy(error.error_msg, error_msg.c_str(), ERROR_MSG_LEN);
95 error.trigger_time = event->gen_time();
96 writer->close_data();
97 return false;
98 }
99 }
100 size_t ret_size = req_history_trade_query_[source].size();
101 SPDLOG_DEBUG("req_history_trade success. Num of results {}", ret_size);
102 }
103 auto it = req_history_trade_query_[source].begin();
104 while (it != req_history_trade_query_[source].end() && limit > 0) {
105 HistoryTrade &history_trade = writer->open_data<HistoryTrade>(now());
106 from_itp(*it, history_trade, trading_day_);
107 history_trade.trade_id = writer->current_frame_uid();
108 limit--;
109 it++;
110 if (it == req_history_trade_query_[source].end()) {
111 history_trade.data_type = HistoryDataType::TotalEnd;
112 } else if (limit == 0) {
113 history_trade.data_type = HistoryDataType::PageEnd;
114 } else {
115 history_trade.data_type = HistoryDataType::Normal;
116 }
117 writer->close_data();
118 }
119 if (it == req_history_trade_query_[source].end()) {
120 req_history_trade_query_.erase(source);
121 } else {
122 req_history_trade_query_[source].erase(
123 req_history_trade_query_[source].begin(),
124 req_history_trade_query_[source].begin() + request.query_num);
125 }
126 return true;
127}
req_order_trigger
查询预埋单
virtual bool req_order_trigger();
向券商柜台查询预埋单最新状态, 预埋单触发或者撤销后, 不会实时推送预埋单的最新状态, 需要手动查询
参数
参数 |
类型 |
说明 |
event |
const event_ptr & |
包含该查询操作来源等信息 |
返回值
类型 |
说明 |
bool |
查询历史成交成功返回true, 查询历史成交失败返回false |
范例
1// ctp的查询预埋单的实现
2bool TraderCTP::req_order_trigger() { return req_trigger(); }
3
4bool TraderCTP::req_trigger() {
5 if (disable_recover_) {
6 return false;
7 }
8
9 add_timer_req_insert_trigger(now() + 1 * time_unit::NANOSECONDS_PER_SECOND);
10 add_timer_req_cancel_trigger(now() + 3 * time_unit::NANOSECONDS_PER_SECOND);
11 return true;
12}
13
14// 查询预埋下单
15void TraderCTP::add_timer_req_insert_trigger(int64_t nano) {
16 add_timer(nano, [&](const auto &event) {
17 CThostFtdcQryParkedOrderField req_parked{};
18 strcpy(req_parked.BrokerID, config_.broker_id.c_str());
19 strcpy(req_parked.InvestorID, config_.account_id.c_str());
20 SPDLOG_INFO("CThostFtdcQryParkedOrderField: {}", to_string(req_parked));
21 int request_id = get_request_id();
22 int rtn = api_->ReqQryParkedOrder(&req_parked, request_id);
23 SPDLOG_INFO("ReqQryTrade rtn {}", rtn);
24 if (rtn != 0) {
25 add_timer_req_insert_trigger(time::now_in_nano() + 3 * time_unit::NANOSECONDS_PER_SECOND);
26 }
27 });
28}
29
30// 查询预埋撤单
31void TraderCTP::add_timer_req_cancel_trigger(int64_t nano) {
32 add_timer(nano, [&](const auto &event) {
33 CThostFtdcQryParkedOrderActionField req_parked_action{};
34 strcpy(req_parked_action.BrokerID, config_.broker_id.c_str());
35 strcpy(req_parked_action.InvestorID, config_.account_id.c_str());
36 SPDLOG_INFO("CThostFtdcQryParkedOrderActionField: {}", to_string(req_parked_action));
37 int request_id = get_request_id();
38 int rtn = api_->ReqQryParkedOrderAction(&req_parked_action, request_id);
39 SPDLOG_INFO("ReqQryTrade rtn {}", rtn);
40 if (rtn != 0) {
41 add_timer_req_cancel_trigger(time::now_in_nano() + 3 * time_unit::NANOSECONDS_PER_SECOND);
42 }
43 });
44}
备注
ctp有流控限制, 连续查询可能会失败, 失败后可以添加一个定时器3秒后再查一次.
req_contract
两融合约查询
virtual bool req_contract();
向券商柜台查询两融合约状态, 融资买入和融券卖出成交后, 两融合约不会实时推送, 想要获取当前最新的两融合约状态需要手动查询.
范例
1// 华锐两融查询合约
2bool TraderATP::req_contract() { return req_contractSpecifications(false); }
3
4// 融资融券合约明细查询函数
5bool TraderATP::req_contractSpecifications(bool is_query_position) {
6 if (!req_contract_status_) {
7 // 如果上次点击前端按钮查合约 还没有结束 就不能重复查询
8 // 因为查持仓那里已经有防止重复查的开关了 所以持仓查合约是不会重复查的 可以不做处理
9 SPDLOG_ERROR("上次更新尚未结束 请稍后再点击更新合约按钮");
10 return false;
11 }
12 if (!is_query_position) {
13 req_contract_status_ = false;
14 }
15 SPDLOG_INFO("融资融券合约明细查询 req_contractSpecifications");
16 ATPReqExtQueryContractSpecificationsMsg req_contract_specifications_msg{};
17
18 strcpy(req_contract_specifications_msg.cust_id, cust_id_.c_str()); /// 客户号ID(必填)
19 strcpy(req_contract_specifications_msg.fund_account_id, td_config_.fund_account_id.c_str()); /// 资金账户ID(必填)
20 int64_t seq_id = get_client_seq_id();
21 if (is_query_position) {
22 set_contract_id_.insert(seq_id);
23 }
24 req_contract_specifications_msg.client_seq_id = seq_id; /// 用户系统消息序号(必填)
25 strcpy(req_contract_specifications_msg.branch_id, td_config_.branch_id.c_str()); /// 营业部ID(必填)
26 ATPRetCodeType ret = atp_trader_api_ptr_->ReqExtQueryContractSpecifications(&req_contract_specifications_msg);
27 SPDLOG_INFO("req_contract return code : {} , "
28 "return message : {}",
29 ret, map_error_code.try_emplace(ret).first->second);
30 if (ret != ErrorCode::kSuccess) {
31 SPDLOG_ERROR("req_contract error return code : {} , "
32 "return message : {}",
33 ret, map_error_code.try_emplace(ret).first->second);
34 }
35 return true;
36}
on_band
virtual void on_band(const event_ptr &event);
当有一个进程调用request_band创建一个写入信道时, master会广播这条信道的信息, 所有的进程都会收到一个Band信息, 可以在on_band中选择是否要订阅这条信道.
一般场景下不会用到该接口.
主要主调接口
update_broker_state
void update_broker_state(BrokerState state);
修改TD的状态
范例
1// xtp登录失败后将TD状态设置成LoginFailed
2void TraderXTP::on_start() {
3 if (config_.client_id < 1 or config_.client_id > 99) {
4 SPDLOG_ERROR("client_id must between 1 and 99");
5 }
6 std::string runtime_folder = get_runtime_folder();
7 SPDLOG_INFO("Connecting XTP account {} with tcp://{}:{}", config_.account_id, config_.td_ip, config_.td_port);
8 api_ = XTP::API::TraderApi::CreateTraderApi(config_.client_id, runtime_folder.c_str());
9 api_->RegisterSpi(this);
10 api_->SubscribePublicTopic(XTP_TERT_QUICK);
11 api_->SetSoftwareVersion("1.1.0");
12 api_->SetSoftwareKey(config_.software_key.c_str());
13 session_id_ = api_->Login(config_.td_ip.c_str(), config_.td_port, config_.account_id.c_str(),
14 config_.password.c_str(), XTP_PROTOCOL_TCP);
15 if (session_id_ > 0) {
16 SPDLOG_INFO("Login successfully");
17 req_order_trade();
18 } else {
19 update_broker_state(BrokerState::LoginFailed);
20 SPDLOG_ERROR("Login failed [{}]: {}", api_->GetApiLastError()->error_id, api_->GetApiLastError()->error_msg);
21 }
22}
23
24// xtp恢复完委托和成交后将TD状态设置成Ready
25void TraderXTP::try_ready() {
26 if (BrokerState::Ready == get_state()) {
27 return;
28 }
29
30 SPDLOG_DEBUG("req_order_over_: {}, req_trade_over_: {}", req_order_over_, req_trade_over_);
31 if (disable_recover_ or (req_order_over_ and req_trade_over_)) {
32 update_broker_state(BrokerState::Ready);
33 }
34}
get_orders
const OrderMap &get_orders();
获取内存里的所有委托Order
范例
1// 在on_recover中根据委托恢复Kungfu的order_id和xtp的委托号映射关系
2for (auto &pair : get_orders()) {
3 SPDLOG_DEBUG("Order: {}", pair.second.data.to_string());
4 const std::string str_external_order_id = pair.second.data.external_order_id.to_string();
5 if (not str_external_order_id.empty()) {
6 uint64_t order_id = pair.first;
7 uint64_t order_xtp_id = std::stoull(str_external_order_id);
8 map_xtp_to_kf_order_id_.emplace(order_xtp_id, order_id);
9 map_kf_to_xtp_order_id_.emplace(order_id, order_xtp_id);
10 }
11}
get_trades
const TradeMap &get_trades();
获取内存里的所有成交Trade
范例
1// 在on_recover中根据成交恢复已处理的成交编号
2for (auto &pair : get_trades()) {
3 SPDLOG_DEBUG("Trade: {}", pair.second.data.to_string());
4 uint64_t order_xtp_id = std::stoull(pair.second.data.external_order_id);
5 map_xtp_order_id_to_xtp_trader_ids_.try_emplace(order_xtp_id)
6 .first->second.emplace(pair.second.data.external_trade_id.to_string());
7}
get_order_triggers
const OrderTriggerMap &get_order_triggers()
获取内存里的所有预埋单OrderTrigger
范例
1// ctp在on_recover中根据预埋单恢复Kungfu的trigger_id和ctp的预埋单编号映射
2for (const auto &trigger_pair : get_order_triggers()) {
3 const OrderTrigger &trigger = trigger_pair.second.data;
4 map_trigger_id_to_ParkedOrderID_.insert_or_assign(
5 trigger.trigger_id, std::pair<std::string, bool>{trigger.external_trigger_id,
6 trigger.action_flag == OrderTriggerFlag::TriggerCancel});
7 map_ParkedOrderId_to_trigger_id_.insert_or_assign(trigger.external_trigger_id, trigger.trigger_id);
8 SPDLOG_DEBUG("OrderTrigger: {}", trigger.to_string());
9}
disable_recover
void disable_recover();
关闭恢复委托和成交, 在pre_start调用改接口后, 重启td之后不再恢复关闭前的委托数据, 并且将处于未完成状态的委托设置成 “丢失” 状态. 如果不调用该接口, 重启td时, 会自动从journal和db中读取从昨天下午4点开始的到现在的所有委托和成交数据, 恢复到内存中, 可以通过get_orders(), get_trades(), get_order_triggers()获取数据
范例
1// xtp在pre_start中根据配置信息设置是否要关闭恢复委托和成交
2void TraderXTP::pre_start() {
3 config_ = nlohmann::json::parse(get_config());
4 SPDLOG_INFO("config: {}", get_config());
5 if (not config_.recover_order_trade) {
6 disable_recover();
7 }
8}
has_writer
bool has_writer(uint32_t dest_id) const;
判断是否存在写给dest的writer
get_writer
writer_ptr get_writer(uint32_t dest_id) const;
获取写给dest的writer
范例
1// xtp在处理成交数据时, 根据是否有writer来决定是否可以使用get_writer
2if (has_writer(order_state.dest)) {
3 auto writer = get_writer(order_state.dest);
4 Trade &trade = writer->open_data<Trade>(now());
5 from_xtp(trade_info, trade);
6 trade.trade_id = writer->current_frame_uid();
7 trade.order_id = kf_order_id;
8 add_traded_volume(trade_info.order_xtp_id, trade.volume);
9 SPDLOG_DEBUG("Trade: {}", trade.to_string());
10 writer->close_data();
11} else {
12 Trade trade{};
13 from_xtp(trade_info, trade);
14 trade.trade_id = get_public_writer()->current_frame_uid() xor (time::now_in_nano() & 0xFFFFFFFF);
15 trade.order_id = kf_order_id;
16 add_traded_volume(trade_info.order_xtp_id, trade.volume);
17 SPDLOG_DEBUG("Trade: {}", trade.to_string());
18 try_write_to(trade, order_state.dest);
19}
get_public_writer
writer_ptr &get_public_writer()
dest为0的writer单独使用一个变量存放, 调用该接口直接返回写入dest为0的writer, 效果等价于get_writer(0), 区别是该接口不涉及stl容器访问, 可以在子线程调用
范例
1// xtp生成系统外订单时, 将数据写入到PUBLIC的journal
2auto writer = get_public_writer();
3auto nano = yijinjing::time::now_in_nano();
4Order &order = writer->open_data<Order>(now());
5order.order_id = writer->current_frame_uid();
6from_xtp(order_info, order);
7order.insert_time = nsec_from_xtp_timestamp(order_info.insert_time);
8order.update_time = nano;
9map_kf_to_xtp_order_id_.emplace(uint64_t(order.order_id), order_info.order_xtp_id);
10map_xtp_to_kf_order_id_.emplace(order_info.order_xtp_id, uint64_t(order.order_id));
11SPDLOG_DEBUG("Order: {}", order.to_string());
12writer->close_data();
13try_deal_XTPTradeReport(order_info.order_xtp_id);
open_data<T>
template <typename T> std::enable_if_t<size_fixed_v<T>, T &> open_data(int64_t trigger_time = 0);
该接口是属于writer的接口, 用于生成Order, Trade 等数据使用
close_data
void close_data(int64_t gen_time = time::now_in_nano());
该接口是属于writer的接口, 用于在open_data完成数据写入后, 标记写入完成;
open_data和close_data必须配对使用, 同一个writer中间不可以进行嵌套
范例
1// xtp生成系统外订单时, 使用open_data和close_data对来生成Order
2auto writer = get_public_writer();
3auto nano = yijinjing::time::now_in_nano();
4Order &order = writer->open_data<Order>(now());
5order.order_id = writer->current_frame_uid();
6from_xtp(order_info, order);
7order.insert_time = nsec_from_xtp_timestamp(order_info.insert_time);
8order.update_time = nano;
9map_kf_to_xtp_order_id_.emplace(uint64_t(order.order_id), order_info.order_xtp_id);
10map_xtp_to_kf_order_id_.emplace(order_info.order_xtp_id, uint64_t(order.order_id));
11SPDLOG_DEBUG("Order: {}", order.to_string());
12writer->close_data();
13try_deal_XTPTradeReport(order_info.order_xtp_id);
data<T>
template <typename T> std::enable_if_t<size_fixed_v<T> or std::is_same_v<T, nlohmann::json>, const T &> data() const;
template <typename T> std::enable_if_t<not size_fixed_v<T> and not std::is_same_v<T, nlohmann::json>, const T> data() const;
该接口属于event类, 一般用于在回调接口中, 通过event指针获取数据, 如果数据类型是没有类似于string这样的长度大小固定的, 返回的就是数据的引用;
如果是Register这种有string类型长度未定的, 返回的就是数据的拷贝.
范例
1// insert_order中获取OrderInput
2const OrderInput &input = event->data<OrderInput>();
3
4// cancel_order中获取OrderAction
5const OrderAction &action = event->data<OrderAction>();
order_from_input
inline void order_from_input(const longfist::types::OrderInput &input, longfist::types::Order &order);
根据OrderInput生成Order, 主要用于在insert_order时将数据进行转换.
范例
1// 在insert_order中根据OrderInput生成OrderInput
2Order &order = writer->open_data<Order>(event->gen_time());
3order_from_input(input, order);
4order.external_order_id = std::to_string(order_xtp_id).c_str();
5order.insert_time = nano;
6order.update_time = nano;
7
8if (success) {
9 map_kf_to_xtp_order_id_.emplace(uint64_t(input.order_id), order_xtp_id);
10 map_xtp_to_kf_order_id_.emplace(order_xtp_id, uint64_t(input.order_id));
11} else {
12 auto error_info = api_->GetApiLastError();
13 order.error_id = error_info->error_id;
14 order.error_msg = error_info->error_msg;
15 order.status = OrderStatus::Error;
16}
17
18SPDLOG_DEBUG("Order: {}", order.to_string());
19writer->close_data();
20
21// 在收到成交推送后写入Trade
22if (has_writer(order_state.dest)) {
23 auto writer = get_writer(order_state.dest);
24 Trade &trade = writer->open_data<Trade>(now());
25 from_xtp(trade_info, trade);
26 trade.trade_id = writer->current_frame_uid();
27 trade.order_id = kf_order_id;
28 add_traded_volume(trade_info.order_xtp_id, trade.volume);
29 SPDLOG_DEBUG("Trade: {}", trade.to_string());
30 writer->close_data();
31} else {
32 Trade trade{};
33 from_xtp(trade_info, trade);
34 trade.trade_id = get_public_writer()->current_frame_uid() xor (time::now_in_nano() & 0xFFFFFFFF);
35 trade.order_id = kf_order_id;
36 add_traded_volume(trade_info.order_xtp_id, trade.volume);
37 SPDLOG_DEBUG("Trade: {}", trade.to_string());
38 try_write_to(trade, order_state.dest);
39}
write_to
template <typename DataType> void write_to(const DataType &data, uint32_t dest_id = yijinjing::data::location::PUBLIC);
将数据类型为DataType的数据data, 写入到dest为dest_id的journal中, 其效果等价于以下实现
1auto writer = get_writer(dest);
2auto &data_to_write = writer->open_data<DataType>(now());
3memcpy(&data_to_write, &data, sizeof(DataType));
4writer->close_data();
备注
调用get_writer和write_to的前提条件是, 存在写给dest_id的writer, 如果不存在, 则会报错崩溃.
与open_data再close_data的区别是, write_to是把已经存在数据拷贝一份到共享内存里, 如果数据已经存在, 不需要从柜台数据进行转换获得, 两种方式执行效率没有区别, 如果数据本身不存在, 需要从柜台API进行转换获取, 使用open_data和close_data可以直接将数据转换到共享内存里, 少一次数据拷贝.
try_write_to
template <typename DataType> void try_write_to(const DataType &data, uint32_t dest_id = yijinjing::data::location::PUBLIC, const std::function<void()> &callback = []() {});
执行效果类似于write_to, 不同的是两点
写完以后会执行一个callback函数
如果不存在dest_id的writer, 则会创建该writer之后再进行写入
1// xtp收到委托推送时, 不管是td重启恢复委托后查询, 还是正常交易中收到数据, 使用try_write_to更新Order可以在没有dest的writer时可以写入
2bool TraderXTP::custom_OnOrderEvent(const XTPOrderInfo &order_info, const XTPRI &error_info, uint64_t session_id) {
3SPDLOG_DEBUG("XTPOrderInfo: {}", to_string(order_info));
4SPDLOG_DEBUG("session_id: {}, XTPRI: {}", session_id, to_string(error_info));
5
6auto order_xtp_id_iter = map_xtp_to_kf_order_id_.find(order_info.order_xtp_id);
7if (order_xtp_id_iter == map_xtp_to_kf_order_id_.end()) {
8 SPDLOG_WARN("unrecognized order_xtp_id {}@{}", order_info.order_xtp_id, trading_day_);
9 return generate_external_order(order_info);
10}
11
12uint64_t kf_order_id = order_xtp_id_iter->second;
13if (not has_order(kf_order_id)) {
14 return generate_external_order(order_info);
15}
16
17auto &order_state = get_order(kf_order_id);
18if (not is_final_status(order_state.data.status) or order_state.data.status == OrderStatus::Lost) {
19 from_xtp_no_price_type(order_info, order_state.data);
20 order_state.data.update_time = yijinjing::time::now_in_nano();
21 if (error_info.error_id != 0) {
22 order_state.data.error_id = error_info.error_id;
23 order_state.data.error_msg = error_info.error_msg;
24 }
25 try_write_to(order_state.data, order_state.dest);
26 SPDLOG_DEBUG("Order: {}", order_state.data.to_string());
27 try_deal_XTPTradeReport(order_info.order_xtp_id);
28}
29return true;
30}
get_thread_writer
writer_ptr &get_thread_writer();
获取线程私有的writer, 作用是给柜台API回调子线程将获取的数据写入到共享内存里, 让主线程从journal读取后再做业务处理, 可以避免容器加锁问题的同时对数据落地, 方便回放
open_custom_data<T>
template <typename T> T &open_custom_data(int32_t msg_type, int64_t trigger_time = 0)
作用于open_data类似, 区别是专门用于写入Kungfu在types.h中定义之外的数据, 同样需要和close_data配对, 同一个writer不能有嵌套
范例
1// xtp子线程中将成交数据写入到共享内存journal里
2auto &bf_order_info = get_thread_writer()->open_custom_data<BufferXTPOrderInfo>(kXTPOrderInfoType, now());
3memcpy(&bf_order_info.order_info, order_info, sizeof(XTPOrderInfo));
4bf_order_info.session_id = session_id;
5if (error_info != nullptr) {
6 memcpy(&bf_order_info.error_info, error_info, sizeof(XTPRI));
7} else {
8 memset(&bf_order_info.error_info, 0, sizeof(XTPRI));
9}
10SPDLOG_DEBUG("BufferXTPOrderInfo: {}", to_string(bf_order_info));
11get_thread_writer()->close_data();
has_order
bool has_order(uint64_t order_id) const;
判断是否存在改order_id的委托
get_order
state<Order> &get_order(uint64_t order_id);
根据order_id获取对应的委托, 如果不存在对应的委托会崩溃, 需要先调用has_order判断是否存在该委托
is_final_status
bool is_final_status(const longfist::enums::OrderStatus &status)
判断委托是否处于最终状态, 一般用于在收到委托推送或者成交推送时, 需要修改Order的成交数量和状态信息, 可能出现乱序推送的情况, 导致先处理了最终状态的推送, 再收到中间状态的推送, 当Order已经处于最终状态时, 不再修改Order.
范例
1// xtp收到委托推送, 根据映射获取的order_id判断是否存在该委托, 再获取对应的Order
2bool TraderXTP::custom_OnOrderEvent(const XTPOrderInfo &order_info, const XTPRI &error_info, uint64_t session_id) {
3 SPDLOG_DEBUG("XTPOrderInfo: {}", to_string(order_info));
4 SPDLOG_DEBUG("session_id: {}, XTPRI: {}", session_id, to_string(error_info));
5
6 auto order_xtp_id_iter = map_xtp_to_kf_order_id_.find(order_info.order_xtp_id);
7 if (order_xtp_id_iter == map_xtp_to_kf_order_id_.end()) {
8 SPDLOG_WARN("unrecognized order_xtp_id {}@{}", order_info.order_xtp_id, trading_day_);
9 return generate_external_order(order_info);
10 }
11
12 uint64_t kf_order_id = order_xtp_id_iter->second;
13 if (not has_order(kf_order_id)) {
14 return generate_external_order(order_info);
15 }
16
17 auto &order_state = get_order(kf_order_id);
18 if (not is_final_status(order_state.data.status) or order_state.data.status == OrderStatus::Lost) {
19 from_xtp_no_price_type(order_info, order_state.data);
20 order_state.data.update_time = yijinjing::time::now_in_nano();
21 if (error_info.error_id != 0) {
22 order_state.data.error_id = error_info.error_id;
23 order_state.data.error_msg = error_info.error_msg;
24 }
25 try_write_to(order_state.data, order_state.dest);
26 SPDLOG_DEBUG("Order: {}", order_state.data.to_string());
27 try_deal_XTPTradeReport(order_info.order_xtp_id);
28 }
29 return true;
30}
has_order_action
bool has_order_action(uint64_t action_id) const;
判断是否存在该action_id的撤单操作
get_order_action
state<OrderAction> &get_order_action(uint64_t action_id);
根据action_id获取对应的撤单操作, 如果不存在对应的撤单操作会崩溃, 需要先调用has_order_action判断是否存在该委托
范例
1// xtp撤单报错时, 根据撤单时构建的映射关系获取action_id后, 根据action_id判断是否存在对应的撤单操作, 获取撤单操作后生成OrderActionError
2bool TraderXTP::custom_OnCancelOrderError(const XTPOrderCancelInfo &cancel_info, const XTPRI &error_info,
3 uint64_t session_id) {
4SPDLOG_DEBUG("XTPOrderCancelInfo: {}", to_string(cancel_info));
5SPDLOG_DEBUG("session_id: {}, XTPRI: {}", session_id, to_string(error_info));
6
7uint64_t action_id = get_action_id(cancel_info.order_xtp_id);
8if (not has_order_action(action_id)) {
9 SPDLOG_WARN("has not related OrderAction of {}:{}", cancel_info.order_xtp_id, action_id);
10 return false;
11}
12
13auto action_state = get_order_action(action_id);
14auto order_id = action_state.data.order_id;
15if (not has_order(order_id)) {
16 SPDLOG_WARN("order_id not in orders_ {}", order_id);
17 return false;
18}
19
20auto order_state = get_order(order_id);
21if (has_writer(order_state.dest)) {
22 OrderActionError &error = get_writer(order_state.dest)->open_data<OrderActionError>(now());
23 error.order_id = order_state.data.order_id; // 订单ID
24 std::string str_external_order_id = std::to_string(cancel_info.order_xtp_id);
25 error.external_order_id = str_external_order_id.c_str();
26 error.order_action_id = action_id; // 订单操作ID,
27 error.error_id = error_info.error_id; // 错误ID
28 error.error_msg = error_info.error_msg; // 错误信息
29 error.insert_time = time::now_in_nano(); // 写入时间
30 SPDLOG_DEBUG("OrderActionError: {}", error.to_string());
31 get_writer(order_state.dest)->close_data();
32} else {
33 OrderActionError error{};
34 error.order_id = order_state.data.order_id; // 订单ID
35 std::string str_external_order_id = std::to_string(cancel_info.order_xtp_id);
36 error.external_order_id = str_external_order_id.c_str();
37 error.order_action_id = action_id; // 订单操作ID,
38 error.error_id = error_info.error_id; // 错误ID
39 error.error_msg = error_info.error_msg; // 错误信息
40 error.insert_time = time::now_in_nano(); // 写入时间
41 SPDLOG_DEBUG("OrderActionError: {}", error.to_string());
42 try_write_to(error, order_state.dest);
43}
44return true;
45}
has_order_trigger
bool has_order_trigger(uint64_t trigger_id) const;
判断是否存在该trigger_id的预埋单
get_order_trigger
state<OrderTrigger> &get_order_trigger(uint64_t trigger_id);
根据trigger_id获取对应的预埋单, 如果不存在对应的预埋单会崩溃, 需要先调用has_order_trigger判断是否存在该预埋单撤
范例
1// ctp预埋单委托下单请求响应, 根据响应信息修改预埋单状态
2bool TraderCTP::custom_OnRspParkedOrderAction(const CThostFtdcParkedOrderActionField &ParkedOrderAction,
3 const CThostFtdcRspInfoField &RspInfo, int nRequestID, bool bIsLast) {
4SPDLOG_DEBUG("CThostFtdcParkedOrderActionField: {}", to_string(ParkedOrderAction));
5SPDLOG_DEBUG("CThostFtdcRspInfoField: {}", to_string(RspInfo));
6SPDLOG_DEBUG("nRequestID: {}, bIsLast: {}", nRequestID, bIsLast);
7
8auto trigger_id_iter = map_request_id_to_kf_action_id_.find(nRequestID);
9if (trigger_id_iter == map_request_id_to_kf_action_id_.end()) {
10 SPDLOG_ERROR("CANNOT FIND trigger_id of {} in map_request_id_to_kf_action_id_", nRequestID);
11 return false;
12}
13
14auto trigger_id = trigger_id_iter->second;
15if (not has_order_trigger(trigger_id)) {
16 SPDLOG_ERROR("CANNOT FIND tigger_id {} in triggers_", trigger_id);
17 return false;
18}
19
20auto &trigger_state = get_order_trigger(trigger_id);
21map_trigger_id_to_ParkedOrderID_.insert_or_assign(
22 trigger_id, std::pair<std::string, bool>{ParkedOrderAction.ParkedOrderActionID, true});
23map_ParkedOrderId_to_trigger_id_.insert_or_assign(ParkedOrderAction.ParkedOrderActionID, trigger_id);
24trigger_state.data.status = parked_status_to_trigger_status(ParkedOrderAction.Status);
25strncpy(trigger_state.data.external_trigger_id, ParkedOrderAction.ParkedOrderActionID,
26 strlen(ParkedOrderAction.ParkedOrderActionID));
27trigger_state.data.error_id = RspInfo.ErrorID;
28const std::string msg = gbk2utf8(RspInfo.ErrorMsg);
29strncpy(trigger_state.data.error_msg, msg.c_str(), msg.length());
30trigger_state.data.update_time = time::now_in_nano();
31
32if (RspInfo.ErrorID != 0) {
33 trigger_state.data.status = OrderStatus::Error;
34 SPDLOG_ERROR("failed to ReqParkedOrderAction, ErrorId: {} ErrorMsg: {}, parked_order_action: {}", RspInfo.ErrorID,
35 gbk2utf8(RspInfo.ErrorMsg), to_string(ParkedOrderAction));
36}
37
38try_write_to(trigger_state.data, trigger_state.dest);
39SPDLOG_DEBUG("OrderTrigger: {}", trigger_state.data.to_string());
40
41return true;
42}
has_order_trigger_action
bool has_order_trigger_action(uint64_t action_id) const;
判断是否存在该action_id的预埋单操作
get_order_trigger
state<OrderTriggerAction> &get_order_trigger_action(uint64_t action_id);
根据action_id获取对应的预埋单撤单操作, 如果不存在对应的预埋单撤单操作会崩溃, 需要先调用has_order_trigger_action判断是否存在该预埋单撤单操
now
int64_t now() const;
触发当前回调函数的journal数据的gen_time;
例如, 当前处于insert_order回调函数, now()的值为OrderInput数据的gen_time; 当前处于cancel_order回调函数, now()的值为OrderAction数据的gen_time;
不要在子线程中调用使用该函数, 在子线程中该值无意义.
now_in_nano
int64_t time::now_in_nano();
系统的当前实际时间, 与当前处于哪个回调函数无关, 主线程和子线程都用都表示的是当前的实际时间.
add_timer
int32_t add_timer(int64_t nanotime, const std::function<void(const event_ptr &)> &callback);
添加定时器, 当实际时间到nanotime时, 执行一次callback函数.
返回值是timer_id, 可以根据该值取消该定时器.
范例
1// ctp查询资金, 如果失败添加定时器2秒后再查一次, 直到查询成功
2void TraderCTP::add_timer_req_account(int64_t nano) {
3add_timer(nano, [&](const auto &event) {
4 CThostFtdcQryTradingAccountField req = {};
5 strcpy(req.BrokerID, config_.broker_id.c_str());
6 strcpy(req.InvestorID, config_.account_id.c_str());
7 SPDLOG_TRACE("CThostFtdcQryTradingAccountField: {}", to_string(req));
8 // std::this_thread::sleep_for(std::chrono::seconds(1));
9 int rtn = api_->ReqQryTradingAccount(&req, get_request_id());
10 SPDLOG_TRACE("ReqQryTradingAccount rtn {}", rtn);
11 if (rtn != 0) {
12 add_timer_req_account(time::now_in_nano() + 2 * time_unit::NANOSECONDS_PER_SECOND);
13 }
14});
15}
add_time_interval
int32_t add_time_interval(int64_t nanotime, const std::function<void(const event_ptr &)> &callback);
添加间隔定时器, 从现在开始, 每隔nanotime时间, 执行一次callback函数.
返回值是timer_id, 可以根据该值取消该定时器.
范例
1// ledger模块每隔一分钟, 通知所有TD查询资金和持仓
2if (sync_asset_) {
3 add_time_interval(time_unit::NANOSECONDS_PER_MINUTE,
4 [&](const event_ptr &e) { request_asset_sync(e->gen_time()); });
5}
6if (sync_position_) {
7 add_time_interval(time_unit::NANOSECONDS_PER_MINUTE,
8 [&](const event_ptr &e) { request_position_sync(e->gen_time()); });
9}
clear_timer
void clear_timer(int32_t timer_id);
根据timer_id取消由add_timer和and_time_interval生成的定时器.
request_deregister
void request_deregister()
申请停止进程, 调用该函数后, 在执行完当前回调函数后, 进程会停止.
推送处理
成交推处理
委托成交时, 会触发柜台API的成交推送函数, 需要根据推送的成交数据中的 柜台API委托号 找到Kungfu对应的Order, 然后生成Trade, 并且修改Order的状态.
针对于异步柜台, 可能会发生成交推送函数早于委托响应函数触发的情况, 此时还没有获得*柜台API委托号*, 无法找到Kungfu对应的Order, 此时需要先将委托暂存, 当收到委托响应后, 再将成交数据取出来处理.
重启TD时, 查询并恢复今日的成交数据, 可能会导致同一笔成交处理两次, 所以针对于已处理的成交数据做标记, 再次收到已经处理过的成交数据时, 不再处理.
范例
1// ctp的处理实现
2
3std::unordered_map<std::string, std::unordered_set<std::string>> map_ExchangeID_OrderSysID_to_TradeIDs_{}; // <交易所:订单号, set<已经处理过的成交编号>>
4std::unordered_map<std::string, std::vector<CThostFtdcTradeField>> map_ExchangeID_OrderSysID_to_CThostFtdcTradeFields_{}; // <交易所:订单号, vector<先于委托响应回来的成交回报>>
5std::unordered_map<std::string, uint64_t> map_ExchangeID_OrderSysID_to_kf_order_id_{}; // <交易所:订单号, kf_order_id>
6std::unordered_map<uint64_t, std::string> map_kf_order_id_to_OrderSysID_{}; // <kf_order_id, 订单号> 撤单用
7std::unordered_map<uint64_t, std::string> map_kf_order_id_to_OrderRef_{}; // <kf_order_id, OrderRef> 撤未提交到交易所的单用
8
9// 判断成交是否已经处理
10bool TraderCTP::has_dealt_trade(const CThostFtdcTradeField &ctp_trade) {
11 auto &dealt_TradeIDs = map_ExchangeID_OrderSysID_to_TradeIDs_
12 .try_emplace(make_ExchangeID_OrderSysID(ctp_trade.ExchangeID, ctp_trade.OrderSysID))
13 .first->second;
14 return dealt_TradeIDs.find(ctp_trade.TradeID) != dealt_TradeIDs.end();
15}
16
17// 记录已经处理过的成交
18void TraderCTP::add_TradeID(const CThostFtdcTradeField &ctp_trade) {
19 map_ExchangeID_OrderSysID_to_TradeIDs_
20 .try_emplace(make_ExchangeID_OrderSysID(ctp_trade.ExchangeID, ctp_trade.OrderSysID))
21 .first->second.insert(ctp_trade.TradeID);
22}
23
24// 成交推送处理
25bool TraderCTP::custom_OnRtnTrade(const CThostFtdcTradeField &ctp_trade) {
26 SPDLOG_DEBUG("CThostFtdcTradeField: {}", to_string(ctp_trade));
27 if (ctp_trade.Price < 1 || ctp_trade.Price == 2.2250738585072014e-308 || ctp_trade.Price == 1.7976931348623158e+308 ||
28 ctp_trade.Volume == -2147483648 || ctp_trade.Volume == 2147483647) {
29 SPDLOG_ERROR("Order Price too low, Not True ctp_trade.Price {}, *ctp_trade {}", ctp_trade.Price,
30 to_string(ctp_trade));
31 return false;
32 }
33
34 const std::string &str_ExchangeID_OrderSysID = make_ExchangeID_OrderSysID(ctp_trade.ExchangeID, ctp_trade.OrderSysID);
35 auto ExchangeID_OrderSysID_iter = map_ExchangeID_OrderSysID_to_kf_order_id_.find(str_ExchangeID_OrderSysID);
36 if (ExchangeID_OrderSysID_iter == map_ExchangeID_OrderSysID_to_kf_order_id_.end()) {
37 SPDLOG_WARN("CANNOT FIND {} in map_ExchangeID_OrderSysID_to_kf_order_id_, STORE CThostFtdcTradeField IN "
38 "map_ExchangeID_OrderSysID_to_CThostFtdcTradeFields_",
39 str_ExchangeID_OrderSysID);
40 map_ExchangeID_OrderSysID_to_CThostFtdcTradeFields_.try_emplace(str_ExchangeID_OrderSysID)
41 .first->second.push_back(ctp_trade); // 暂存先于委托响应收到的成交推送
42 return false;
43 }
44 deal_trade(ctp_trade);
45 return true;
46}
47
48// 委托响应和推送处理
49bool TraderCTP::custom_OnRtnOrder(const CThostFtdcOrderField &ctp_order) {
50 SPDLOG_DEBUG("CThostFtdcOrderField: {}", to_string(ctp_order));
51
52 const std::string str_ExchangeID_OrderLocalID =
53 make_ExchangeID_OrderSysID(ctp_order.ExchangeID, ctp_order.OrderLocalID);
54 const std::string str_ExchangeID_OrderSysID = make_ExchangeID_OrderSysID(ctp_order.ExchangeID, ctp_order.OrderSysID);
55 uint64_t orderRef_key = get_orderRef_key(ctp_order.FrontID, ctp_order.SessionID, ctp_order.OrderRef);
56 auto OrderRefKey_iter = map_OrderRefKey_to_kf_order_id_.find(orderRef_key);
57 auto ExchangeID_OrderSysID_iter = map_ExchangeID_OrderSysID_to_kf_order_id_.find(str_ExchangeID_OrderSysID);
58 auto ExchangeID_OrderLocalID_iter = map_ExchangeID_OrderSysID_to_kf_order_id_.find(str_ExchangeID_OrderLocalID);
59
60 // 系统外订单信息
61 if (OrderRefKey_iter == map_OrderRefKey_to_kf_order_id_.end() and
62 ExchangeID_OrderSysID_iter == map_ExchangeID_OrderSysID_to_kf_order_id_.end() and
63 ExchangeID_OrderLocalID_iter == map_ExchangeID_OrderSysID_to_kf_order_id_.end()) {
64 SPDLOG_WARN("external Order with ExchangeID: {}, OrderSysID: {}, OrderLocalID: {}", ctp_order.ExchangeID,
65 ctp_order.OrderSysID, ctp_order.OrderLocalID);
66 return generate_external_order(ctp_order);
67 }
68
69 uint64_t kf_order_id = 0;
70 if (strlen(ctp_order.OrderSysID) != 0 and
71 ExchangeID_OrderSysID_iter != map_ExchangeID_OrderSysID_to_kf_order_id_.end()) {
72 kf_order_id = ExchangeID_OrderSysID_iter->second;
73 } else if (OrderRefKey_iter != map_OrderRefKey_to_kf_order_id_.end()) {
74 kf_order_id = OrderRefKey_iter->second;
75 } else if (ExchangeID_OrderLocalID_iter != map_ExchangeID_OrderSysID_to_kf_order_id_.end()) {
76 kf_order_id = ExchangeID_OrderLocalID_iter->second;
77 } else {
78 SPDLOG_ERROR("invalidate CThostFtdcOrderField: {}", to_string(ctp_order));
79 return false;
80 }
81
82 if (strlen(ctp_order.OrderSysID) != 0) {
83 map_kf_order_id_to_OrderSysID_.insert_or_assign(kf_order_id, ctp_order.OrderSysID);
84 map_ExchangeID_OrderSysID_to_kf_order_id_.insert_or_assign(str_ExchangeID_OrderSysID, kf_order_id);
85 }
86
87 if (not has_order(kf_order_id)) {
88 SPDLOG_WARN("no order_id {} in orders_", kf_order_id);
89 return generate_external_order(ctp_order);
90 }
91
92 auto &order_state = get_order(kf_order_id);
93
94 if (is_final_status(order_state.data.status) and order_state.data.status != longfist::enums::OrderStatus::Lost and
95 order_state.data.status != longfist::enums::OrderStatus::Cancelling) {
96 return true;
97 }
98
99 from_ctp(ctp_order, order_state.data);
100 order_state.data.update_time = time::now_in_nano();
101 if (has_writer(order_state.dest)) {
102 write_to(order_state.data, order_state.dest);
103 } else {
104 ++try_write_to_order_count;
105 try_write_to(order_state.data, order_state.dest, [&]() {
106 --try_write_to_order_count;
107 try_ready();
108 });
109 }
110 try_deal_trade(str_ExchangeID_OrderSysID); // 处理暂存的委托
111 return true;
112}
113
114// 处理暂存的委托
115void TraderCTP::try_deal_trade(const std::string &str_ExchangeID_OrderSysID) {
116 SPDLOG_DEBUG("ExchangeID_OrderSysID: {}", str_ExchangeID_OrderSysID);
117 auto &ctp_trades =
118 map_ExchangeID_OrderSysID_to_CThostFtdcTradeFields_.try_emplace(str_ExchangeID_OrderSysID).first->second;
119 for (const CThostFtdcTradeField &ctp_trade : ctp_trades) {
120 deal_trade(ctp_trade);
121 }
122 ctp_trades.clear();
123}
124
125// 生成Trade
126void TraderCTP::deal_trade(const CThostFtdcTradeField &ctp_trade) {
127 SPDLOG_DEBUG("CThostFtdcTradeField: {}", to_string(ctp_trade));
128 const std::string &str_ExchangeID_OrderSysID = make_ExchangeID_OrderSysID(ctp_trade.ExchangeID, ctp_trade.OrderSysID);
129 auto ExchangeID_OrderSysID_iter = map_ExchangeID_OrderSysID_to_kf_order_id_.find(str_ExchangeID_OrderSysID);
130 if (ExchangeID_OrderSysID_iter == map_ExchangeID_OrderSysID_to_kf_order_id_.end()) {
131 SPDLOG_ERROR("CANNOT FIND {} in map_ExchangeID_OrderSysID_to_kf_order_id_", str_ExchangeID_OrderSysID);
132 return;
133 }
134 uint64_t kf_order_id = ExchangeID_OrderSysID_iter->second;
135 if (not has_order(kf_order_id)) {
136 SPDLOG_ERROR("no order_id {} in orders_", kf_order_id);
137 return;
138 }
139
140 // 判断委托是否已经处理过
141 if (has_dealt_trade(ctp_trade)) {
142 SPDLOG_WARN("trade: [{}:{}] HAS DEALT, DO NOT DEAL SECOND TIME", str_ExchangeID_OrderSysID, ctp_trade.TradeID);
143 return;
144 }
145 add_TradeID(ctp_trade); // 记录已处理委托
146
147 auto &order_state = get_order(kf_order_id);
148 // 生成Trade
149 if (has_writer(order_state.dest)) {
150 auto writer = get_writer(order_state.dest);
151 Trade &trade = writer->open_data<Trade>(0);
152 from_ctp(ctp_trade, trade);
153 uint64_t trade_id = writer->current_frame_uid();
154 trade.trade_id = trade_id;
155 trade.order_id = order_state.data.order_id;
156 SPDLOG_DEBUG("Trade: {}", trade.to_string());
157 writer->close_data();
158 } else {
159 Trade trade{};
160 from_ctp(ctp_trade, trade);
161 uint64_t trade_id = get_public_writer()->current_frame_uid() xor (time::now_in_nano() & 0x0000FFFF);
162 trade.trade_id = trade_id;
163 trade.order_id = order_state.data.order_id;
164 SPDLOG_DEBUG("Trade: {}", trade.to_string());
165 ++try_write_to_trade_count;
166 try_write_to(trade, order_state.dest, [&]() {
167 --try_write_to_trade_count;
168 try_ready();
169 });
170 }
171}
系统外委托和成交推送处理
当同一个交易账户在多个客户端同时登陆时, Kungfu客户端会收到其他客户端提交的委托的推送信息, 可以根据个人需求选择是否要处理系统外的委托和成交.
范例
1// ctp处理实现
2
3// 委托响应和推送处理
4bool TraderCTP::custom_OnRtnOrder(const CThostFtdcOrderField &ctp_order) {
5 SPDLOG_DEBUG("CThostFtdcOrderField: {}", to_string(ctp_order));
6
7 const std::string str_ExchangeID_OrderLocalID =
8 make_ExchangeID_OrderSysID(ctp_order.ExchangeID, ctp_order.OrderLocalID);
9 const std::string str_ExchangeID_OrderSysID = make_ExchangeID_OrderSysID(ctp_order.ExchangeID, ctp_order.OrderSysID);
10 uint64_t orderRef_key = get_orderRef_key(ctp_order.FrontID, ctp_order.SessionID, ctp_order.OrderRef);
11 auto OrderRefKey_iter = map_OrderRefKey_to_kf_order_id_.find(orderRef_key);
12 auto ExchangeID_OrderSysID_iter = map_ExchangeID_OrderSysID_to_kf_order_id_.find(str_ExchangeID_OrderSysID);
13 auto ExchangeID_OrderLocalID_iter = map_ExchangeID_OrderSysID_to_kf_order_id_.find(str_ExchangeID_OrderLocalID);
14
15 // 系统外订单信息
16 if (OrderRefKey_iter == map_OrderRefKey_to_kf_order_id_.end() and
17 ExchangeID_OrderSysID_iter == map_ExchangeID_OrderSysID_to_kf_order_id_.end() and
18 ExchangeID_OrderLocalID_iter == map_ExchangeID_OrderSysID_to_kf_order_id_.end()) {
19 SPDLOG_WARN("external Order with ExchangeID: {}, OrderSysID: {}, OrderLocalID: {}", ctp_order.ExchangeID,
20 ctp_order.OrderSysID, ctp_order.OrderLocalID);
21 return generate_external_order(ctp_order); // 处理系统外委托
22 }
23
24 uint64_t kf_order_id = 0;
25 if (strlen(ctp_order.OrderSysID) != 0 and
26 ExchangeID_OrderSysID_iter != map_ExchangeID_OrderSysID_to_kf_order_id_.end()) {
27 kf_order_id = ExchangeID_OrderSysID_iter->second;
28 } else if (OrderRefKey_iter != map_OrderRefKey_to_kf_order_id_.end()) {
29 kf_order_id = OrderRefKey_iter->second;
30 } else if (ExchangeID_OrderLocalID_iter != map_ExchangeID_OrderSysID_to_kf_order_id_.end()) {
31 kf_order_id = ExchangeID_OrderLocalID_iter->second;
32 } else {
33 SPDLOG_ERROR("invalidate CThostFtdcOrderField: {}", to_string(ctp_order));
34 return false;
35 }
36
37 if (strlen(ctp_order.OrderSysID) != 0) {
38 map_kf_order_id_to_OrderSysID_.insert_or_assign(kf_order_id, ctp_order.OrderSysID);
39 map_ExchangeID_OrderSysID_to_kf_order_id_.insert_or_assign(str_ExchangeID_OrderSysID, kf_order_id);
40 }
41
42 if (not has_order(kf_order_id)) {
43 SPDLOG_WARN("no order_id {} in orders_", kf_order_id);
44 return generate_external_order(ctp_order);
45 }
46
47 auto &order_state = get_order(kf_order_id);
48
49 if (is_final_status(order_state.data.status) and order_state.data.status != longfist::enums::OrderStatus::Lost and
50 order_state.data.status != longfist::enums::OrderStatus::Cancelling) {
51 return true;
52 }
53
54 from_ctp(ctp_order, order_state.data);
55 order_state.data.update_time = time::now_in_nano();
56 if (has_writer(order_state.dest)) {
57 write_to(order_state.data, order_state.dest);
58 } else {
59 ++try_write_to_order_count;
60 try_write_to(order_state.data, order_state.dest, [&]() {
61 --try_write_to_order_count;
62 try_ready();
63 });
64 }
65 try_deal_trade(str_ExchangeID_OrderSysID); // 处理暂存的委托
66 return true;
67}
68
69// 根据系统外的委托推送信息生成Kunfu的Order
70bool TraderCTP::generate_external_order(const CThostFtdcOrderField &ctp_order) {
71 SPDLOG_DEBUG("CThostFtdcOrderField: {}", to_string(ctp_order));
72
73 if (not config_.sync_external_order) {
74 return false;
75 }
76
77 auto nano = time::now_in_nano();
78 auto writer_order = get_public_writer();
79 Order &order = writer_order->open_data<Order>(now());
80 order.order_id = writer_order->current_frame_uid();
81 from_ctp(ctp_order, order);
82 order.insert_time = nsec_from_ctp_time(ctp_order.InsertDate, ctp_order.InsertTime);
83 if (order.insert_time > nano) {
84 order.insert_time -= time_unit::NANOSECONDS_PER_DAY;
85 }
86 order.update_time = nano;
87 writer_order->close_data();
88 SPDLOG_DEBUG("Order: {}", order.to_string());
89
90 uint64_t orderRef_key = get_orderRef_key(ctp_order.FrontID, ctp_order.SessionID, ctp_order.OrderRef);
91 map_OrderRefKey_to_kf_order_id_.insert_or_assign(orderRef_key, uint64_t(order.order_id));
92
93 const std::string str_ExchangeID_OrderSysID =
94 strlen(ctp_order.OrderSysID) == 0 ? make_ExchangeID_OrderSysID(ctp_order.ExchangeID, ctp_order.OrderLocalID)
95 : make_ExchangeID_OrderSysID(ctp_order.ExchangeID, ctp_order.OrderSysID);
96 if (strlen(ctp_order.OrderSysID) != 0) {
97 map_ExchangeID_OrderSysID_to_kf_order_id_.insert_or_assign(str_ExchangeID_OrderSysID, uint64_t(order.order_id));
98 map_kf_order_id_to_OrderSysID_.insert_or_assign(uint64_t(order.order_id), ctp_order.OrderSysID);
99 }
100 try_deal_trade(str_ExchangeID_OrderSysID); // 处理暂存的成交
101 return true;
102}
备注
如果系统外的委托推送先于成交推送到达, 处理完系统外委托后, 成交推送就可以当做系统内正常顺序的成交推送处理.
如果系统外的成交推送先于委托推送到达, 成交信息会暂存, 处理完系统外委托后, 会和处理系统内暂存的成交信息一样处理.
行情源对接
行情接口基类
1class MarketData : public BrokerService {
2public:
3 explicit MarketData(BrokerVendor &vendor) : BrokerService(vendor) {}
4
5 // 策略或前端调用subscribe订阅指定标的行情时会触发该回调, 用于根据交易所和标的号订阅行情
6 virtual bool subscribe(const std::vector<longfist::types::InstrumentKey> &instrument_keys);
7
8 // subscribe_custom默认实现会调用该回调, 用于订阅行情源全市场行情
9 virtual bool subscribe_all();
10
11 // 策略调用subscribe_all会触发该回调, 用于根据自定义的行情类型, 标的类型, 逐笔类型进行全市场订阅; 默认实现为调用MarketData::subscribe_all
12 virtual bool subscribe_custom(const longfist::types::CustomSubscribe &custom_sub);
13
14 // 保留接口
15 virtual bool unsubscribe(const std::vector<longfist::types::InstrumentKey> &instrument_keys);
16
17 // 收到master广播的Band数据会触发该回调, 用于订阅band信道
18 virtual void on_band(const event_ptr &event);
19
20protected:
21 [[nodiscard]] bool has_instrument(const std::string &instrument_id) const;
22
23 [[nodiscard]] const longfist::types::Instrument &get_instrument(const std::string &instrument_id) const;
24
25 void update_instrument(longfist::types::Instrument instrument);
26
27 void try_subscribe();
28
29 void add_instrument_key(const longfist::types::InstrumentKey &key);
30
31 std::unordered_map<std::string, longfist::types::Instrument> instruments_ = {};
32 std::vector<longfist::types::InstrumentKey> instruments_to_subscribe_ = {};
33};
主要回调接口
subscribe
virtual bool subscribe(const std::vector<InstrumentKey> &instrument_keys);
用于根据交易所和标的号订阅行情
策略或前端订阅指定标的时, 订阅的标的会暂存在 std::vector<InstrumentKey> instruments_to_subscribe_
,
每隔一秒会检查 instruments_to_subscribe_.empty()
,
如果非空则调用 subscribe(instruments_to_subscribe_)
,
结束后执行 instruments_to_subscribe_.clear()
清空列表.
参数
参数 |
类型 |
说明 |
instrument_keys |
const std::vector<InstrumentKey> & |
订阅的标的列表 |
返回值
类型 |
说明 |
bool |
订阅成功返回true, 订阅失败返回false |
范例
1// ctp的subscribe实现
2bool MarketDataCTP::subscribe(const std::vector<InstrumentKey> &instruments) {
3 auto length = instruments.size();
4 auto targets = new char *[length];
5 for (int i = 0; i < length; i++) {
6 targets[i] = const_cast<char *>(instruments[i].instrument_id.value);
7 }
8 auto rtn = api_->SubscribeMarketData(targets, length);
9 delete[] targets;
10 return rtn == 0;
11}
subscribe_all
virtual bool subscribe_all();
用于订阅全市场行情
参数
参数 |
类型 |
说明 |
无 |
无 |
无 |
返回值
类型 |
说明 |
bool |
订阅成功返回true, 订阅失败返回false |
范例
1// xtp的subscribe_all实现
2bool subscribe_all() override {
3 auto result = api_->SubscribeAllMarketData() && api_->SubscribeAllTickByTick();
4 SPDLOG_INFO("subscribe all, rtn code {}", result);
5 return result;
6}
subscribe_custom
virtual bool subscribe_custom(const longfist::types::CustomSubscribe &custom_sub);
用于根据自定义的行情类型, 标的类型, 逐笔类型进行全市场订阅; 默认实现为调用 MarketData::subscribe_all
参数
参数 |
类型 |
说明 |
custom_sub |
CustomSubscribe |
自定义交易市场, |
返回值
类型 |
说明 |
bool |
订阅成功返回true, 订阅失败返回false |
unsubscribe
virtual bool unsubscribe(const std::vector<longfist::types::InstrumentKey> &instrument_keys);
最初设计时是为了方面取消订阅指定的标的行情, 但是在多策略场景下, 一个策略调用取消订阅了指定标的后, 会导致订阅了同样标的的其他策略无法收到行情, 故而不建议使用.
on_band
virtual void on_band(const event_ptr &event);
当有一个进程调用request_band创建一个写入信道时, master会广播这条信道的信息, 所有的进程都会收到一个Band信息, 可以在on_band中选择是否要订阅这条信道.
一般场景下不会用到该接口.
主要主调接口
request_band
uint32_t request_band(const std::string &band_name, uint64_t page_size = 0);
申请一个band信道, 返回结果为band对应的location_uid, page_size为journal每一页的大小.
范例
1// xtp行情在pre_start中申请两个大小为256MB的信道, 用于写入逐笔委托和逐笔成交行情
2void MarketDataXTP::pre_start() {
3 entrust_band_uid_ = request_band("market-data-band-entrust", 256);
4 transaction_band_uid_ = request_band("market-data-band-transaction", 256);
5}
update_broker_state
void update_broker_state(BrokerState state);
修改TD的状态
范例
1// xtp在行情登录成功后将MD状态设置为Ready
2void MarketDataXTP::on_start() {
3 MDConfiguration config = nlohmann::json::parse(get_config());
4 if (config.client_id < 1 or config.client_id > 99) {
5 SPDLOG_ERROR("client_id must between 1 and 99");
6 }
7 auto md_ip = config.md_ip.c_str();
8 auto account_id = config.account_id.c_str();
9 auto password = config.password.c_str();
10 auto protocol_type = get_xtp_protocol_type(config.protocol);
11 std::string runtime_folder = get_runtime_folder();
12 SPDLOG_INFO("Connecting XTP MD for {} at {}://{}:{}", account_id, config.protocol, md_ip, config.md_port);
13 api_ =
14 XTP::API::QuoteApi::CreateQuoteApi(config.client_id, runtime_folder.c_str(), XTP_LOG_LEVEL::XTP_LOG_LEVEL_INFO);
15 if (config.protocol == "udp") {
16 api_->SetUDPBufferSize(config.buffer_size);
17 }
18 api_->RegisterSpi(this);
19 if (api_->Login(md_ip, config.md_port, account_id, password, protocol_type) == 0) {
20 update_broker_state(BrokerState::LoggedIn);
21 update_broker_state(BrokerState::Ready);
22 SPDLOG_INFO("login success! (account_id) {}", config.account_id);
23 if (config.query_instruments and not check_if_stored_instruments(time::strfnow("%Y%m%d"))) {
24 api_->QueryAllTickers(XTP_EXCHANGE_SH);
25 api_->QueryAllTickers(XTP_EXCHANGE_SZ);
26 api_->QueryAllTickersFullInfo(XTP_EXCHANGE_SH);
27 api_->QueryAllTickersFullInfo(XTP_EXCHANGE_SZ);
28 }
29 } else {
30 update_broker_state(BrokerState::LoginFailed);
31 SPDLOG_ERROR("failed to login, [{}] {}", api_->GetApiLastError()->error_id, api_->GetApiLastError()->error_msg);
32 }
33}
has_band_writer
bool has_band_writer(uint32_t dest_id) const;
判断是否有写入到band的writer
get_band_writer
yijinjing::journal::writer_ptr get_band_writer(uint32_t dest_id) const;
获取写入到Bnad的writer
范例
1// 收到逐笔推送时, 判断是否存在写入到Band的writer, 然后获取对应的writer赋值给对应变量, 下次收到行情推送时可以直接使用writer, 减少对map的访问
2void MarketDataXTP::OnTickByTick(XTPTBT *tbt_data) {
3 if (tbt_data->type == XTP_TBT_ENTRUST) {
4 if (tbt_data->entrust.ord_type == 'D') {
5 if (not transaction_band_writer_) {
6 if (not has_band_writer(transaction_band_uid_)) {
7 SPDLOG_INFO("band writer for market-data-band-transaction not ready");
8 return;
9 }
10 transaction_band_writer_ = get_band_writer(transaction_band_uid_);
11 }
12 Transaction &transaction = transaction_band_writer_->open_data<Transaction>(0);
13 from_xtp(*tbt_data, transaction);
14 transaction_band_writer_->close_data();
15 } else {
16 if (not entrust_band_writer_) {
17 if (not has_band_writer(entrust_band_uid_)) {
18 SPDLOG_INFO("band writer for market-data-band-entrust not ready");
19 return;
20 }
21 entrust_band_writer_ = get_band_writer(entrust_band_uid_);
22 }
23 Entrust &entrust = entrust_band_writer_->open_data<Entrust>(0);
24 from_xtp(*tbt_data, entrust);
25 entrust_band_writer_->close_data();
26 }
27 } else if (tbt_data->type == XTP_TBT_TRADE) {
28 if (not transaction_band_writer_) {
29 if (not has_band_writer(transaction_band_uid_)) {
30 SPDLOG_INFO("band writer for market-data-band-transaction not ready");
31 return;
32 }
33 transaction_band_writer_ = get_band_writer(transaction_band_uid_);
34 }
35 Transaction &transaction = transaction_band_writer_->open_data<Transaction>(0);
36 from_xtp(*tbt_data, transaction);
37 transaction_band_writer_->close_data();
38 }
39}
get_public_writer
writer_ptr &get_public_writer()
dest为0的writer单独使用一个变量存放, 调用该接口直接返回写入dest为0的writer, 效果等价于get_writer(0), 区别是该接口不涉及stl容器访问, 可以在子线程调用
open_data<T>
template <typename T> std::enable_if_t<size_fixed_v<T>, T &> open_data(int64_t trigger_time = 0);
该接口是属于writer的接口, 用于生成Order, Trade 等数据使用
close_data
void close_data(int64_t gen_time = time::now_in_nano());
该接口是属于writer的接口, 用于在open_data完成数据写入后, 标记写入完成;
open_data和close_data必须配对使用, 同一个writer中间不可以进行嵌套
范例
1// xtp在收到快照行情推送时, 将数据转换成Quote写到PUBLIC进行广播
2void MarketDataXTP::OnQueryAllTickers(XTPQSI *ticker_info, XTPRI *error_info, bool is_last) {
3 if (nullptr != error_info && error_info->error_id != 0) {
4 SPDLOG_ERROR("error_id : {} , error_msg : {}", error_info->error_id, error_info->error_msg);
5 return;
6 }
7
8 if (nullptr == ticker_info) {
9 SPDLOG_ERROR("ticker_info is nullptr");
10 return;
11 }
12
13 Instrument &instrument = get_public_writer()->open_data<Instrument>(0);
14 from_xtp(ticker_info, instrument);
15 get_public_writer()->close_data();
16}
行情推送处理
快照行情写入
当行情源API的快照回调虚函数触发时, 需要将行情源的快照数据转换成kungfu::longfist::types::Quote数据结构, 快照数据可以直接写入到PUBLIC信道里, 具体操作方式参考以下XTP范例.
范例
1// 交易所转换
2inline void from_xtp(const XTP_MARKET_TYPE &xtp_market_type, char *exchange_id) {
3 if (xtp_market_type == XTP_MKT_SH_A) {
4 strcpy(exchange_id, "SSE");
5 } else if (xtp_market_type == XTP_MKT_SZ_A) {
6 strcpy(exchange_id, "SZE");
7 }
8}
9
10// xtp的Quote数据结构转换实现
11inline void from_xtp(const XTPMarketDataStruct &ori, Quote &des) {
12 des.data_time = nsec_from_xtp_timestamp(ori.data_time);
13 des.instrument_id = ori.ticker;
14 from_xtp(ori.exchange_id, des.exchange_id);
15
16 des.instrument_type = ori.data_type != XTP_MARKETDATA_OPTION ? get_instrument_type(des.exchange_id, des.instrument_id)
17 : InstrumentType::StockOption;
18
19 des.last_price = ori.last_price;
20 des.pre_settlement_price = ori.pre_settl_price;
21 des.pre_close_price = ori.pre_close_price;
22 des.open_price = ori.open_price;
23 des.high_price = ori.high_price;
24 des.low_price = ori.low_price;
25 des.volume = ori.qty;
26 des.turnover = ori.turnover;
27 des.close_price = ori.close_price;
28 des.settlement_price = ori.settl_price;
29 des.upper_limit_price = ori.upper_limit_price;
30 des.lower_limit_price = ori.lower_limit_price;
31 des.total_trade_num = ori.trades_count;
32
33 memcpy(des.ask_price, ori.ask, sizeof(des.ask_price));
34 memcpy(des.bid_price, ori.bid, sizeof(des.ask_price));
35 for (std::size_t i = 0; i < 10; i++) {
36 des.ask_volume[i] = ori.ask_qty[i];
37 des.bid_volume[i] = ori.bid_qty[i];
38 }
39}
40
41void MarketDataXTP::OnDepthMarketData(XTPMD *market_data, int64_t *bid1_qty, int32_t bid1_count, int32_t max_bid1_count,
42 int64_t *ask1_qty, int32_t ask1_count, int32_t max_ask1_count) {
43 if (nullptr == market_data) {
44 SPDLOG_ERROR("XTPMD is nullptr");
45 }
46
47 Quote "e = get_public_writer()->open_data<Quote>(0);
48 from_xtp(*market_data, quote);
49 get_public_writer()->close_data();
50}
逐笔行情写入
当行情源API的快照回调虚函数触发时, 需要将行情源的逐笔委托行情数据转换成 kungfu::longfist::types::Entrust 数据结构, 需要将行情源的逐笔成交行情数据转换成 kungfu::longfist::types::Transaction 数据结构.
建议分别为Entrust和Transaction分别创建各自的band的信道, 具体操作参考以下XTP范例.
范例
1// 声明两个线程私有writer变量
2class MarketDataXTP : public XTP::API::QuoteSpi, public broker::MarketData {
3// ... 其他内容
4private:
5 inline static thread_local yijinjing::journal::writer_ptr entrust_band_writer_ = nullptr;
6 inline static thread_local yijinjing::journal::writer_ptr transaction_band_writer_ = nullptr;
7};
8
9// pre_start 申请创建对应的band信道
10void MarketDataXTP::pre_start() {
11 entrust_band_uid_ = request_band("market-data-band-entrust", 256);
12 transaction_band_uid_ = request_band("market-data-band-transaction", 256);
13}
14
15// 交易所转换
16inline void from_xtp(const XTP_MARKET_TYPE &xtp_market_type, char *exchange_id) {
17 if (xtp_market_type == XTP_MKT_SH_A) {
18 strcpy(exchange_id, "SSE");
19 } else if (xtp_market_type == XTP_MKT_SZ_A) {
20 strcpy(exchange_id, "SZE");
21 }
22}
23
24// 逐笔委托转换
25inline void from_xtp(const XTPTickByTickStruct &ori, Entrust &des) {
26 from_xtp(ori.exchange_id, des.exchange_id);
27 des.instrument_id = ori.ticker;
28 des.data_time = nsec_from_xtp_timestamp(ori.data_time);
29
30 des.price = ori.entrust.price;
31 des.volume = ori.entrust.qty;
32 des.main_seq = ori.entrust.channel_no;
33 des.seq = ori.entrust.seq;
34
35 if (ori.entrust.ord_type == '1') {
36 des.price_type = PriceType::Any;
37 } else if (ori.entrust.ord_type == '2') {
38 des.price_type = PriceType::Limit;
39 } else if (ori.entrust.ord_type == 'U') {
40 des.price_type = PriceType::ForwardBest;
41 }
42
43 // xtp(深交所的order_no在xtp接口注释标注为无意义,偶尔为0 seq对应的是真正的订单号 上交所的order_no是订单号)
44 if (strcmp(des.exchange_id, "SSE")) {
45 des.orig_order_no = ori.entrust.order_no;
46 } else {
47 des.orig_order_no = ori.entrust.seq;
48 }
49
50 switch (ori.entrust.side) {
51 case 'B': {
52 des.side = Side::Buy;
53 break;
54 }
55 case 'S': {
56 des.side = Side::Sell;
57 break;
58 }
59 case '1': {
60 des.side = Side::Buy;
61 break;
62 }
63 case '2': {
64 des.side = Side::Sell;
65 break;
66 }
67 default: {
68 des.side = Side::Unknown;
69 break;
70 }
71 }
72}
73
74
75
76
77// XTP行情源的逐笔行情回调
78void MarketDataXTP::OnTickByTick(XTPTBT *tbt_data) {
79 if (tbt_data->type == XTP_TBT_ENTRUST) {
80 if (tbt_data->entrust.ord_type == 'D') {
81 if (not transaction_band_writer_) {
82 if (not has_band_writer(transaction_band_uid_)) {
83 return;
84 }
85 transaction_band_writer_ = get_band_writer(transaction_band_uid_);
86 }
87 Transaction &transaction = transaction_band_writer_->open_data<Transaction>(0);
88 from_xtp(*tbt_data, transaction);
89 transaction_band_writer_->close_data();
90 } else {
91 if (not entrust_band_writer_) {
92 if (not has_band_writer(entrust_band_uid_)) {
93 return;
94 }
95 entrust_band_writer_ = get_band_writer(entrust_band_uid_);
96 }
97 Entrust &entrust = entrust_band_writer_->open_data<Entrust>(0);
98 from_xtp(*tbt_data, entrust);
99 entrust_band_writer_->close_data();
100 }
101 } else if (tbt_data->type == XTP_TBT_TRADE) {
102 if (not transaction_band_writer_) {
103 if (not has_band_writer(transaction_band_uid_)) {
104 return;
105 }
106 transaction_band_writer_ = get_band_writer(transaction_band_uid_);
107 }
108 Transaction &transaction = transaction_band_writer_->open_data<Transaction>(0);
109 from_xtp(*tbt_data, transaction);
110 transaction_band_writer_->close_data();
111 }
112}