柜台对接

前置注意事项

本节主要介绍以下内容,

  1. Kungfu共享内存结构, 理解该结构有助于处理柜台的参数 kungfu::event_ptr&

  2. kungfu::event_ptr 的内容和使用方式

  3. 柜台回调函数的多线程处理, 通过journal把数据转到主线程处理, 避免加锁, 同时支持回放

kungfu共享内存结构

kungfu框架中进程间通信采用共享内存队列的形式, 一个进程将数据写入共享内存文件, 其他多个进程可以同时读取.

一条信道对应一个逻辑上的journal文件, 表示逻辑上一个无限大的队列.

具体实现时将一个journal切分为多个固定大小的page, 只要磁盘足够大就可以无限的增加page的个数, 每一个page文件的命名规则为 “dest_id的十六进制进程.page数.journal”.

存放在写入进程的目录下, 实盘模式下写入进程的journal所属目录命名规则为 “$KF_HOME/runtime/journal/{category}/{group}/{name}/{mode}”.

范例

1$KF_HOME/runtime/journal/td/CTP/089270/live, 表示实盘模式的CTP柜台对应账户为089270的共享内存文件目录

每一个page存放具体的数据, 每一个数据视为一个frame, frame由frame_header和data组成.

journal的结构如下图所示,

_images/journal.png

kungfu::event_ptr

在kungfu系统中, 所有的”typename_ptr”都实际表示std::shared_ptr<typename>, 即kungfu::event_ptr表示std::shared_ptr<kungfu::event>.

当A进程往共享内存里写入一个数据时, 会生成一个frame, 里面包含了frame_header和具体的数据内容, 当B进程读到该数据时, 也会将frame_header和后面的数据封装成一个frame, kungfu::event_ptr就是指向这个frame的指针, B进程会根据frame_header里面的数据类型msg_type触发不同的函数.

例如当策略调用下单时, 会往共享内存里写入一个OrderInput数据, TD读取到这个数据时, 会把frame_header和OrderInput封装在一个frame里, 然后调用虚函数insert_order函数, 并且把指向这个frame的指针kungfu::event_ptr作为参数传入, 在override的insert_order函数里通过调用event的对应函数, 可以获取到这个frame里面的frame_header和具体数据OrderInput的信息.

范例

 1//ctp的insert_order实现
 2bool TraderCTP::insert_order(const event_ptr &event) {
 3    const OrderInput &input = event->data<OrderInput>();
 4    SPDLOG_DEBUG("OrderInput: {}", input.to_string());
 5
 6    int request_id = get_request_id();
 7    int error_id = 0;
 8    uint64_t orderRef_key = 0;
 9
10    CThostFtdcInputOrderField ctp_input{};
11    to_ctp(ctp_input, input);
12    strcpy(ctp_input.BrokerID, config_.broker_id.c_str());
13    strcpy(ctp_input.InvestorID, config_.account_id.c_str());
14    strcpy(ctp_input.OrderRef, std::to_string(++order_ref_).c_str());
15    SPDLOG_DEBUG("CThostFtdcInputOrderField: {}", to_string(ctp_input));
16    error_id = api_->ReqOrderInsert(&ctp_input, request_id);
17    orderRef_key = get_orderRef_key(front_id_, session_id_, ctp_input.OrderRef);
18
19    auto nano = time::now_in_nano();
20    auto writer = get_writer(event->source());
21    Order &order = writer->open_data<Order>(event->gen_time());
22    order_from_input(input, order);
23    order.insert_time = nano;
24    order.update_time = nano;
25    order.time_condition = from_ctp_time_condition(ctp_input.TimeCondition);
26
27    if (error_id != 0) {
28        order.error_id = error_id;
29        order.status = OrderStatus::Error;
30    }
31
32    SPDLOG_DEBUG("Order: {}", order.to_string());
33    writer->close_data();
34    map_request_id_to_kf_order_id_.insert_or_assign(request_id, uint64_t(input.order_id));
35    map_OrderRefKey_to_kf_order_id_.insert_or_assign(orderRef_key, uint64_t(input.order_id));
36    map_kf_order_id_to_OrderRef_.insert_or_assign(input.order_id, ctp_input.OrderRef);
37    return error_id == 0;
38}


柜台回调多线程处理

一般情况下, 我们主动调用柜台API接口发送数据, 和柜台SPI回调接口是在两个不同的线程, 有的柜台API接口响应函数和数据推送函数是不同的线程, 此时是有三个线程.

对于TD我们提交委托和收到委托响应, 以及收到成交推送时, 都需要访问map <柜台委托号, Kungfu委托号> 来确定成交和委托的映射关系, 一般情况下都会选择使用加锁的方式来保证线程安全.

例如, 针对于异步下单的场景, 我们需要先构建 <下单跟踪信息, Kungfu委托号> map映射关系, 在委托报单响应回调里面访问这个map再构建 <API委托号, Kungfu委托号> 映射关系, 当收到成交推送的时候才能找到这笔成交属于Kungfu的哪一笔委托, 此时如果不对相关的容器进行加锁, 最容易遇到的情况就是一个线程插入数据导致stl扩容, 原来的内存地址失效, 此时另一个线程正在访问原来的内存地址, 此时会导致整个进程崩溃.

虽然通过加锁可以解决TD这一层的线程安全问题, 但是我们整个Kungfu框架的底层设计是针对于多进程单线程设计的, 底层的容器的访问绝大部分都没有加锁, API子线程和Kungfu主线程同时访问同一个容器时, 仍然存在线程安全问题 例如, 当柜台API成交推送在子线程推送成交数据时, 在子线程里调用get_writer(策略A)去把成交信息写给策略A, 而此时有策略B启动申请和TD通信, 主线程刚好需要添加TD写给策略B的writer, 此时有可能会导致get_writer(策略A)访问的容器触发扩容内存失效, 导致内存崩溃, 虽然这种概率非常小, 但是也存在风险.

为了规避多线程安全问题, 以及防止频繁的加锁解锁带来的性能损耗, 我们统一将TD子线程推送的数据通过共享内存传输给主线程处理. 其实现方式为, 定义一个 inline static thread_local journal::writer_ptr thread_writer_; 给每一个子线程, 调用 get_thread_writer() 会获取属于当前线程的writer, Kungfu底层会做好主线程读取的处理. 当子线程回调触发时, 调用该函数, 直接将数据原原本本的写入到共享内存, 主线程读取到后再做后续的业务处理, 这样做的好处有两个,

  1. 所有的容器访问都在主线程, 不需要加锁

  2. 柜台APi的原始数据全部落地保存, 可以随时使用回放功能查看柜台原始数据

针对于共享内存中的每一个数据, Kugnfu会针对于frame_header里的msg_type来执行不同处理函数, 使用thread_writer将数据从子线程转到主线程处理时, 需要自己针对于不同的柜台API数据定义msg_type, 在TD进程中, 所有不属于Kungfu系统定义的msg_type都会执行 virtual bool on_custom_event(const event_ptr &event); 回调. 在override中再分配针对于自定义msg_type的处理.

柜台范例, XTP的Order和Trade的回调处理

  1// 建议使用大于一百万的整型数作为自定义的msg_type
  2static constexpr int32_t kXTPOrderInfoType = 12340001;
  3static constexpr int32_t kXTPTradeReportType = 12340002;
  4static constexpr int32_t kQueryXTPOrderInfoType = 12340003;
  5static constexpr int32_t kQueryXTPTradeReportType = 12340004;
  6static constexpr int32_t kCancelOrderErrorType = 12340005;
  7static constexpr int32_t kQueryAssetType = 12340011;
  8static constexpr int32_t kQueryPositionType = 12340012;
  9
 10// 自定义msg_type数据回调, 根据对应msg_type执行对应的处理
 11bool TraderXTP::on_custom_event(const event_ptr &event) {
 12    SPDLOG_DEBUG("msg_type: {}", event->msg_type());
 13    switch (event->msg_type()) {
 14    case kXTPOrderInfoType:
 15        return custom_OnOrderEvent(event);
 16    case kXTPTradeReportType:
 17        return custom_OnTradeEvent(event);
 18    case kQueryXTPOrderInfoType:
 19        return custom_OnQueryOrder(event);
 20    case kQueryXTPTradeReportType:
 21        return custom_OnQueryTrade(event);
 22    case kCancelOrderErrorType:
 23        return custom_OnCancelOrderError(event);
 24    case kQueryAssetType:
 25        return custom_OnQueryAsset(event);
 26    case kQueryPositionType:
 27        return custom_OnQueryPosition(event);
 28    default:
 29        SPDLOG_ERROR("unrecognized msg_type: {}", event->msg_type());
 30        return false;
 31    }
 32}
 33
 34// 对于回调参数为多个的回调接口, 需要定义一个struct一起存放
 35// 成交推送和查询历史成交的数据结构相似, 共用一个struct, 多出来的字段查询历史成交用的
 36struct BufferXTPTradeReport {
 37    XTPQueryTradeRsp trade_info;
 38    XTPRI error_info;
 39    int request_id;
 40    bool is_last;
 41    uint64_t session_id;
 42};
 43
 44// 委托回调和查询历史委托的数据结构相似, 共用一个struct, 多出来的字段查询历史委托用的
 45struct BufferXTPOrderInfo {
 46    XTPOrderInfo order_info;
 47    XTPRI error_info;
 48    int request_id;
 49    bool is_last;
 50    uint64_t session_id;
 51};
 52
 53// xtp的子线程Order回调函数, 直接将数据写入共享内存, 不做任何业务处理
 54void TraderXTP::OnOrderEvent(XTPOrderInfo *order_info, XTPRI *error_info, uint64_t session_id) {
 55    if (nullptr == order_info) {
 56        SPDLOG_ERROR("XTPOrderInfo is nullptr");
 57        return;
 58    }
 59    SPDLOG_DEBUG("XTPOrderInfo: {}", to_string(*order_info));
 60    auto &bf_order_info = get_thread_writer()->open_custom_data<BufferXTPOrderInfo>(kXTPOrderInfoType, now());
 61    memcpy(&bf_order_info.order_info, order_info, sizeof(XTPOrderInfo));
 62    bf_order_info.session_id = session_id;
 63    if (error_info != nullptr) {
 64        memcpy(&bf_order_info.error_info, error_info, sizeof(XTPRI));
 65    } else {
 66        memset(&bf_order_info.error_info, 0, sizeof(XTPRI));
 67    }
 68    SPDLOG_DEBUG("BufferXTPOrderInfo: {}", to_string(bf_order_info));
 69    get_thread_writer()->close_data();
 70}
 71
 72// 主线程处理, 将自定义的struct读出, 对应的数据分解回原来的样子, 传给对应的Order处理函数
 73bool TraderXTP::custom_OnOrderEvent(const event_ptr &event) {
 74    const auto *bf_order_info = reinterpret_cast<const BufferXTPOrderInfo *>(event->data_address());
 75    return custom_OnOrderEvent(bf_order_info->order_info, bf_order_info->error_info, bf_order_info->session_id);
 76}
 77
 78// 主线程处理, API接口回调参数是指针, 需要做指针判空处理, 当触发该处理函数时一定是有有效数据的, 改成使用引用免去指针判空
 79bool TraderXTP::custom_OnOrderEvent(const XTPOrderInfo &order_info, const XTPRI &error_info, uint64_t session_id) {
 80    SPDLOG_DEBUG("XTPOrderInfo: {}", to_string(order_info));
 81    SPDLOG_DEBUG("session_id: {}, XTPRI: {}", session_id, to_string(error_info));
 82
 83    // 所有容器访问不需要加锁
 84    auto order_xtp_id_iter = map_xtp_to_kf_order_id_.find(order_info.order_xtp_id);
 85    if (order_xtp_id_iter == map_xtp_to_kf_order_id_.end()) {
 86        SPDLOG_WARN("unrecognized order_xtp_id {}@{}", order_info.order_xtp_id, trading_day_);
 87        return generate_external_order(order_info);
 88    }
 89
 90    uint64_t kf_order_id = order_xtp_id_iter->second;
 91    if (not has_order(kf_order_id)) {
 92        return generate_external_order(order_info);
 93    }
 94
 95    auto &order_state = get_order(kf_order_id);
 96    if (not is_final_status(order_state.data.status) or order_state.data.status == OrderStatus::Lost) {
 97        from_xtp_no_price_type(order_info, order_state.data);
 98        order_state.data.update_time = yijinjing::time::now_in_nano();
 99        if (error_info.error_id != 0) {
100        order_state.data.error_id = error_info.error_id;
101        order_state.data.error_msg = error_info.error_msg;
102        }
103        try_write_to(order_state.data, order_state.dest);
104        SPDLOG_DEBUG("Order: {}", order_state.data.to_string());
105        try_deal_XTPTradeReport(order_info.order_xtp_id);
106    }
107    return true;
108}
109
110
111// xtp的子线程成交推送回调函数, 直接将数据写入共享内存, 不做任何业务处理
112void TraderXTP::OnTradeEvent(XTPTradeReport *trade_info, uint64_t session_id) {
113    if (nullptr == trade_info) {
114        SPDLOG_ERROR("XTPTradeReport is nullptr");
115        return;
116    }
117    SPDLOG_DEBUG("XTPTradeReport: {}", to_string(*trade_info));
118
119    auto &bf_trade_info = get_thread_writer()->open_custom_data<BufferXTPTradeReport>(kXTPTradeReportType, now());
120    memcpy(&bf_trade_info.trade_info, trade_info, sizeof(XTPTradeReport));
121    bf_trade_info.session_id = session_id;
122    SPDLOG_DEBUG("BufferXTPOrderInfo: {}", to_string(bf_trade_info));
123    get_thread_writer()->close_data();
124}
125
126// 主线程处理, 将自定义的struct读出, 对应的数据分解回原来的样子, 传给对应的Trade处理函数
127bool TraderXTP::custom_OnTradeEvent(const event_ptr &event) {
128    const auto *bf_trade_info = reinterpret_cast<const BufferXTPTradeReport *>(event->data_address());
129    return custom_OnTradeEvent(bf_trade_info->trade_info, bf_trade_info->session_id);
130}
131
132
133// 主线程处理, API接口回调参数是指针, 需要做指针判空处理, 当触发该处理函数时一定是有有效数据的, 改成使用引用免去指针判空
134bool TraderXTP::custom_OnTradeEvent(const XTPTradeReport &trade_info, uint64_t session_id) {
135    SPDLOG_DEBUG("XTPTradeReport: {}", to_string(trade_info));
136    SPDLOG_DEBUG("session_id: {}", session_id);
137
138    // 所有容器访问不需要加锁
139    auto order_xtp_id_iter = map_xtp_to_kf_order_id_.find(trade_info.order_xtp_id);
140    if (order_xtp_id_iter == map_xtp_to_kf_order_id_.end()) {
141        SPDLOG_WARN("unrecognized order_xtp_id {}, store in map_xtp_order_id_to_XTPTradeReports_", trade_info.order_xtp_id);
142        add_XTPTradeReport(trade_info);
143        return false;
144    }
145
146    if (has_dealt_trade(trade_info.order_xtp_id, trade_info.exec_id)) {
147        SPDLOG_DEBUG("order_xtp_id:{}, exec_id: {}, has dealt", trade_info.order_xtp_id, trade_info.exec_id);
148        return false;
149    }
150
151    uint64_t kf_order_id = order_xtp_id_iter->second;
152    if (not has_order(kf_order_id)) {
153        SPDLOG_ERROR("no order_id {} in orders_", kf_order_id);
154        return false;
155    }
156
157    add_dealt_trade(trade_info.order_xtp_id, trade_info.exec_id);
158    auto &order_state = get_order(kf_order_id);
159
160    if (has_writer(order_state.dest)) {
161        auto writer = get_writer(order_state.dest);
162        Trade &trade = writer->open_data<Trade>(now());
163        from_xtp(trade_info, trade);
164        trade.trade_id = writer->current_frame_uid();
165        trade.order_id = kf_order_id;
166        add_traded_volume(trade_info.order_xtp_id, trade.volume);
167        SPDLOG_DEBUG("Trade: {}", trade.to_string());
168        writer->close_data();
169    } else {
170        Trade trade{};
171        from_xtp(trade_info, trade);
172        trade.trade_id = get_public_writer()->current_frame_uid() xor (time::now_in_nano() & 0xFFFFFFFF);
173        trade.order_id = kf_order_id;
174        add_traded_volume(trade_info.order_xtp_id, trade.volume);
175        SPDLOG_DEBUG("Trade: {}", trade.to_string());
176        try_write_to(trade, order_state.dest);
177    }
178
179    if (not is_final_status(order_state.data.status) or order_state.data.status == OrderStatus::Lost) {
180        order_state.data.volume_left = std::min<int64_t>(
181            order_state.data.volume_left, order_state.data.volume - get_traded_volume(trade_info.order_xtp_id));
182        if (order_state.data.volume_left > 0) {
183        order_state.data.status = OrderStatus::PartialFilledActive;
184        }
185        order_state.data.update_time = now();
186        SPDLOG_DEBUG("Order: {}", order_state.data.to_string());
187        try_write_to(order_state.data, order_state.dest);
188    }
189    return true;
190}

Broker配置和启动流程

Broker配置

TD和MD的账户信息配置通过package.json来设置, 详情参考 package.json配置信息 一节.

通过前端界面配置的账户信息, 可以通过get_config()获取到一个json格式的字符串, 可以直接使用Kungfu引入的nlohmann json库进行解析, 使用方式有两种,

  1. 直接使用json对象通过key和value键值对获取前端输入的信息.

  2. 定义一个struct并且配置from_json函数, nlohmann json库可以直接将json格式的字符串转换成对应的struct对象.

建议使用第二种, 以下是xtp的范例

 1// 自定义一个struct
 2struct MDConfiguration {
 3    int client_id;
 4    std::string account_id;
 5    std::string password;
 6    std::string md_ip;
 7    int md_port;
 8    std::string protocol;
 9    int buffer_size;
10    bool query_instruments;
11};
12
13// 定义如何从json转换成struct对象
14void from_json(const nlohmann::json &j, MDConfiguration &c) {
15    j.at("client_id").get_to(c.client_id);
16    j.at("account_id").get_to(c.account_id);
17    j.at("password").get_to(c.password);
18    j.at("md_ip").get_to(c.md_ip);
19    j.at("md_port").get_to(c.md_port);
20    c.protocol = j.value("protocol", "tcp");
21    if (c.protocol != "udp") {
22        c.protocol = "tcp";
23    }
24    c.buffer_size = j.value("buffer_size", 64);
25    c.query_instruments = j.value<bool>("query_instruments", false);
26}
27
28// 获取前端配置的参数信息
29void MarketDataXTP::on_start() {
30    MDConfiguration config = nlohmann::json::parse(get_config());
31    // 其他处理代码
32}
  1{
  2    "name": "@kungfu-trader/kfx-broker-xtp-demo",
  3    "author": {
  4        "name": "Kungfu Trader",
  5        "email": "info@kungfu.link"
  6    },
  7    "version": "2.7.5-alpha.14",
  8    "description": "Kungfu Extension - XTP Demo",
  9    "license": "Apache-2.0",
 10    "main": "package.json",
 11    "repository": {
 12        "url": "https://github.com/kungfu-trader/kungfu.git"
 13    },
 14    "publishConfig": {
 15        "registry": "https://npm.pkg.github.com"
 16    },
 17    "binary": {
 18        "module_name": "kfx-broker-xtp-demo",
 19        "module_path": "dist/xtp",
 20        "remote_path": "{module_name}/v{major}/v{version}",
 21        "package_name": "{module_name}-v{version}-{platform}-{arch}-{configuration}.tar.gz",
 22        "host": "https://prebuilt.libkungfu.cc"
 23    },
 24    "scripts": {
 25        "build": "kfs extension build",
 26        "clean": "kfs extension clean",
 27        "format": "node ../../framework/core/.gyp/run-format-cpp.js src",
 28        "install": "node -e \"require('@kungfu-trader/kungfu-core').prebuilt('install')\"",
 29        "package": "kfs extension package"
 30    },
 31    "dependencies": {
 32        "@kungfu-trader/kungfu-core": "^2.7.5-alpha.14"
 33    },
 34    "devDependencies": {
 35        "@kungfu-trader/kungfu-sdk": "^2.7.5-alpha.14"
 36    },
 37    "kungfuDependencies": {
 38        "xtp": "v2.2.37.4"
 39    },
 40    "kungfuBuild": {
 41        "build_type": "Release",
 42        "cpp": {
 43            "target": "bind/python",
 44            "links": {
 45                "windows": [
 46                    "xtptraderapi",
 47                    "xtpquoteapi"
 48                ],
 49                "linux": [
 50                    "xtptraderapi",
 51                    "xtpquoteapi"
 52                ],
 53                "macos": [
 54                    "xtptraderapi",
 55                    "xtpquoteapi"
 56                ]
 57            }
 58        }
 59    },
 60    "kungfuConfig": {
 61        "key": "xtp",
 62        "name": "XTP",
 63        "language": {
 64            "zh-CN": {
 65                "account_name": "账户别名",
 66                "account_name_tip": "请填写账户别名",
 67                "account_id": "账户",
 68                "account_id_tip": "请填写账户, 例如:1504091",
 69                "password": "密码",
 70                "password_tip": "请填写密码, 例如:123456",
 71                "software_key": "软件密钥",
 72                "software_key_tip": "请填写软件密钥, 例如:b8aa7173bba3470e390d787219b2112",
 73                "td_ip": "交易IP",
 74                "td_ip_tip": "请填写交易IP, 例如:61.152.102.111",
 75                "td_port": "交易端口",
 76                "td_port_tip": "请填写交易端口, 例如:8601",
 77                "client_id": "客户ID",
 78                "client_id_tip": "请填写自定义多点登录ID 1-99整数,不同节点之间不要重复",
 79                "md_ip": "行情IP",
 80                "md_ip_tip": "请填写行情IP, 例如:61.152.102.110",
 81                "md_port": "行情端口",
 82                "md_port_tip": "请填写行情端口, 例如:8602",
 83                "protocol": "协议",
 84                "protocol_tip": "请选择协议, tcp 或者 udp",
 85                "buffer_size": "缓冲区大小",
 86                "buffer_size_tip": "请填写 缓冲区大小(mb)",
 87                "sync_external_order": "同步外部订单",
 88                "sync_external_order_msg": "是否同步外部订单",
 89                "sync_external_order_tip": "若开启则同步用户在其他交易软件的订单",
 90                "recover_order_trade": "恢复订单",
 91                "recover_order_trade_msg": "启动时是否查询恢复订单",
 92                "recover_order_trade_tip": "若开启则启动时查询今日委托和成交",
 93                "query_instruments": "查询可交易标的",
 94                "query_instruments_tip": "是否查询可交易标的, 开启后会查询所有可交易标的, 流量太大频繁查询可能导致账号或ip被XTP拉黑"
 95            },
 96            "en-US": {
 97                "account_name": "account name",
 98                "account_name_tip": "Please enter account name",
 99                "account_id": "account id",
100                "account_id_tip": "Please enter account id",
101                "password": "password",
102                "password_tip": "Please enter password, for example:123456",
103                "software_key": "software key",
104                "software_key_tip": "Please enter software key, for example :b8aa7173bba3470e390d787219b2112",
105                "td_ip": "td IP",
106                "td_ip_tip": "Please enter td IP, for example:61.152.102.111",
107                "td_port": "td port",
108                "td_port_tip": "Please enter td port, for example:8601",
109                "client_id": "client id",
110                "client_id_tip": "Please enter t user-defined multipoint client ID, which is an integer ranging from 1 to 99. The value must be unique on different nodes",
111                "md_ip": "md IP",
112                "md_ip_tip": "Please enter md IP, for example :61.152.102.110",
113                "md_port": "md port",
114                "md_port_tip": "Please enter md port, for example:8602",
115                "protocol": "protocol",
116                "protocol_tip": "Please select protocol, tcp or udp",
117                "buffer_size": "buffer size",
118                "buffer_size_tip": "Please enter buffer size(mb)",
119                "sync_external_order": "sync external order",
120                "sync_external_order_msg": "Whether open sync_external_order",
121                "sync_external_order_tip": "If enabled, it synchronizes users' orders in other trading software",
122                "recover_order_trade": "recover order trade",
123                "recover_order_trade_msg": "Whether recover order trade",
124                "recover_order_trade_tip": "If enabled, query order and trade when TD ready",
125                "query_instruments": "query instruments",
126                "query_instruments_tip": "If enabled, query instruments. too much infomation may result in account or ip blacklisted by XTP"
127            }
128        },
129        "config": {
130            "td": {
131                "type": [
132                    "stock"
133                ],
134                "settings": [
135                    {
136                        "key": "account_name",
137                        "name": "xtp.account_name",
138                        "type": "str",
139                        "tip": "xtp.account_name_tip"
140                    },
141                    {
142                        "key": "account_id",
143                        "name": "xtp.account_id",
144                        "type": "str",
145                        "required": true,
146                        "primary": true,
147                        "tip": "xtp.account_id_tip"
148                    },
149                    {
150                        "key": "password",
151                        "name": "xtp.password",
152                        "type": "password",
153                        "required": true,
154                        "tip": "xtp.password_tip"
155                    },
156                    {
157                        "key": "software_key",
158                        "name": "xtp.software_key",
159                        "type": "str",
160                        "required": true,
161                        "tip": "xtp.software_key_tip"
162                    },
163                    {
164                        "key": "td_ip",
165                        "name": "xtp.td_ip",
166                        "type": "str",
167                        "required": true,
168                        "tip": "xtp.td_ip_tip"
169                    },
170                    {
171                        "key": "td_port",
172                        "name": "xtp.td_port",
173                        "type": "int",
174                        "required": true,
175                        "tip": "xtp.td_port_tip"
176                    },
177                    {
178                        "key": "client_id",
179                        "name": "xtp.client_id",
180                        "type": "int",
181                        "required": true,
182                        "tip": "xtp.client_id_tip"
183                    },
184                    {
185                        "key": "sync_external_order",
186                        "name": "xtp.sync_external_order",
187                        "type": "bool",
188                        "errMsg": "xtp.sync_external_order_msg",
189                        "required": false,
190                        "default": false,
191                        "tip": "xtp.sync_external_order_tip"
192                    },
193                    {
194                        "key": "recover_order_trade",
195                        "name": "xtp.recover_order_trade",
196                        "type": "bool",
197                        "errMsg": "xtp.recover_order_trade_msg",
198                        "required": false,
199                        "default": false,
200                        "tip": "xtp.recover_order_trade_tip"
201                    }
202                ]
203            },
204            "md": {
205                "type": [
206                    "stock"
207                ],
208                "settings": [
209                    {
210                        "key": "account_id",
211                        "name": "xtp.account_id",
212                        "type": "str",
213                        "required": true,
214                        "tip": "xtp.account_id_tip",
215                        "default": "15011218"
216                    },
217                    {
218                        "key": "password",
219                        "name": "xtp.password",
220                        "type": "password",
221                        "required": true,
222                        "tip": "xtp.password_tip",
223                        "default": "PsVqy99v"
224                    },
225                    {
226                        "key": "md_ip",
227                        "name": "xtp.md_ip",
228                        "type": "str",
229                        "required": true,
230                        "tip": "xtp.md_ip_tip",
231                        "default": "119.3.103.38"
232                    },
233                    {
234                        "key": "md_port",
235                        "name": "xtp.md_port",
236                        "type": "int",
237                        "required": true,
238                        "tip": "xtp.md_port_tip",
239                        "default": 6002
240                    },
241                    {
242                        "key": "protocol",
243                        "name": "xtp.protocol",
244                        "type": "select",
245                        "options": [
246                            {
247                                "value": "tcp",
248                                "label": "tcp"
249                            },
250                            {
251                                "value": "udp",
252                                "label": "udp"
253                            }
254                        ],
255                        "required": false,
256                        "tip": "xtp.protocol_tip",
257                        "default": "tcp"
258                    },
259                    {
260                        "key": "buffer_size",
261                        "name": "xtp.buffer_size",
262                        "type": "int",
263                        "tip": "xtp.buffer_size_tip",
264                        "required": false
265                    },
266                    {
267                        "key": "client_id",
268                        "name": "xtp.client_id",
269                        "type": "int",
270                        "required": true,
271                        "tip": "xtp.client_id_tip",
272                        "default": 23
273                    },
274                    {
275                        "key": "query_instruments",
276                        "name": "xtp.query_instruments",
277                        "type": "bool",
278                        "required": false,
279                        "tip": "xtp.query_instruments_tip",
280                        "default": false
281                    }
282                ]
283            }
284        }
285    }
286}

Broker启动流程

柜台和行情源模块都继承 class BrokerService, 在启动进程时, 会先执行pre_start()做一些启动前的配置处理, 例如, 设置是否从恢复今日委托到内存, 创建Band信道等.

根据pre_start()的设置完成配置处理后, 执行on_start(), 这里一般都是设置柜台链接ip地址和端口, 以及账户密码进行连接.

当进程执行结束退出前, 会执行on_exit(), 这里一般是用于退出登录和内存释放.

 1class BrokerService {
 2    public:
 3    typedef longfist::enums::BrokerState BrokerState;
 4
 5    explicit BrokerService(BrokerVendor &vendor);
 6
 7    virtual ~BrokerService() = default;
 8
 9    virtual void pre_start();
10
11    virtual void on_start();
12
13    virtual void on_exit();
14
15    // 其他属性
16};

TD启动流程

对于同步登陆接口, 在on_start中调用登陆接口后能立刻判断是否登录成功, 登陆成功后, 立即调用恢复Order和Trade的函数, 如果设置跳过了恢复今日委托, 则立即进入Ready状态; 否则在恢复完Order和Trade之后再进入Ready状态, 进入Ready状态后会立即执行req_position和req_account, 查询持仓和资金, 之后就可以进行委托报单操作.

对于异步登陆接口, 需要在登陆响应回调函数中才能知道是否登陆成功, 后续执行与同步登录接口同样的操作.

范例

 1// xtp柜台在pre_start配置是否需要恢复今日委托
 2void TraderXTP::pre_start() {
 3    config_ = nlohmann::json::parse(get_config());
 4    SPDLOG_INFO("config: {}", get_config());
 5    if (not config_.recover_order_trade) {
 6        disable_recover();
 7    }
 8}
 9
10
11// xtp柜台在on_start进行登录
12void TraderXTP::on_start() {
13    if (config_.client_id < 1 or config_.client_id > 99) {
14        SPDLOG_ERROR("client_id must between 1 and 99");
15    }
16    std::string runtime_folder = get_runtime_folder();
17    SPDLOG_INFO("Connecting XTP account {} with tcp://{}:{}", config_.account_id, config_.td_ip, config_.td_port);
18    api_ = XTP::API::TraderApi::CreateTraderApi(config_.client_id, runtime_folder.c_str());
19    api_->RegisterSpi(this);
20    api_->SubscribePublicTopic(XTP_TERT_QUICK);
21    api_->SetSoftwareVersion("1.1.0");
22    api_->SetSoftwareKey(config_.software_key.c_str());
23    session_id_ = api_->Login(config_.td_ip.c_str(), config_.td_port, config_.account_id.c_str(),
24                                config_.password.c_str(), XTP_PROTOCOL_TCP);
25    if (session_id_ > 0) {
26        SPDLOG_INFO("Login successfully");
27        req_order_trade(); // 登陆成功后恢复Order和Trade, 恢复完后将TD状态设置成Ready
28    } else {
29        update_broker_state(BrokerState::LoginFailed);
30        SPDLOG_ERROR("Login failed [{}]: {}", api_->GetApiLastError()->error_id, api_->GetApiLastError()->error_msg);
31    }
32}
33
34// 查询恢复Order和Trade
35void TraderXTP::req_order_trade() {
36    if (disable_recover_) {
37        return try_ready(); // 设置跳过恢复则立即设置成Ready
38    }
39
40    XTPQueryOrderReq query_order_param{};
41    int ret = api_->QueryOrders(&query_order_param, session_id_, get_request_id());
42    if (0 != ret) {
43        SPDLOG_ERROR("QueryOrders False: {}", ret);
44    }
45
46    XTPQueryTraderReq query_trade_param{};
47    ret = api_->QueryTrades(&query_trade_param, session_id_, get_request_id());
48    if (0 != ret) {
49        SPDLOG_ERROR("QueryTrades False : {}", ret);
50    }
51}
52
53// 将TD状态设置成Ready
54void TraderXTP::try_ready() {
55    if (BrokerState::Ready == get_state()) {
56        return;
57    }
58
59    SPDLOG_DEBUG("req_order_over_: {}, req_trade_over_: {}", req_order_over_, req_trade_over_);
60    if (disable_recover_ or (req_order_over_ and req_trade_over_)) {
61        update_broker_state(BrokerState::Ready);
62    }
63}
64
65// xtp柜台在on_exit里退出登录
66void TraderXTP::on_exit() {
67    if (api_ != nullptr and session_id_ > 0) {
68        auto result = api_->Logout(session_id_);
69        SPDLOG_INFO("Logout with return code {}", result);
70    }
71}

MD启动流程

MD的登录流程相对简单, 对于同步登陆接口, 直接在on_start里面将MD状态设置成Ready; 对于异步登陆接口, 在登陆响应接口直接将MD状态设置成Ready.

范例

 1// xtp行情源在pre_start里申请创建两个页大小为256MB的band信道
 2void MarketDataXTP::pre_start() {
 3    entrust_band_uid_ = request_band("market-data-band-entrust", 256);
 4    transaction_band_uid_ = request_band("market-data-band-transaction", 256);
 5}
 6
 7// xtp行情源在on_start里进行登录
 8void MarketDataXTP::on_start() {
 9    MDConfiguration config = nlohmann::json::parse(get_config());
10    if (config.client_id < 1 or config.client_id > 99) {
11        SPDLOG_ERROR("client_id must between 1 and 99");
12    }
13    auto md_ip = config.md_ip.c_str();
14    auto account_id = config.account_id.c_str();
15    auto password = config.password.c_str();
16    auto protocol_type = get_xtp_protocol_type(config.protocol);
17    std::string runtime_folder = get_runtime_folder();
18    SPDLOG_INFO("Connecting XTP MD for {} at {}://{}:{}", account_id, config.protocol, md_ip, config.md_port);
19    api_ =
20        XTP::API::QuoteApi::CreateQuoteApi(config.client_id, runtime_folder.c_str(), XTP_LOG_LEVEL::XTP_LOG_LEVEL_INFO);
21    if (config.protocol == "udp") {
22        api_->SetUDPBufferSize(config.buffer_size);
23    }
24    api_->RegisterSpi(this);
25    if (api_->Login(md_ip, config.md_port, account_id, password, protocol_type) == 0) {
26        update_broker_state(BrokerState::LoggedIn);
27        update_broker_state(BrokerState::Ready);
28        SPDLOG_INFO("login success! (account_id) {}", config.account_id);
29        if (config.query_instruments and not check_if_stored_instruments(time::strfnow("%Y%m%d"))) {
30        api_->QueryAllTickers(XTP_EXCHANGE_SH);
31        api_->QueryAllTickers(XTP_EXCHANGE_SZ);
32        api_->QueryAllTickersFullInfo(XTP_EXCHANGE_SH);
33        api_->QueryAllTickersFullInfo(XTP_EXCHANGE_SZ);
34        }
35    } else {
36        update_broker_state(BrokerState::LoginFailed);
37        SPDLOG_ERROR("failed to login, [{}] {}", api_->GetApiLastError()->error_id, api_->GetApiLastError()->error_msg);
38    }
39}

交易柜台对接

交易接口基类

  1class Trader : public BrokerService {
  2public:
  3    explicit Trader(BrokerVendor &vendor) : BrokerService(vendor){};
  4
  5    virtual longfist::enums::AccountType get_account_type() const = 0;
  6
  7    // 普通委托报单
  8    virtual bool insert_order(const event_ptr &event) = 0;
  9
 10    // 预埋单报单
 11    virtual bool insert_order_trigger(const event_ptr &event);
 12
 13    // 大宗交易报单
 14    virtual bool insert_block_order(const event_ptr &event, const longfist::types::BlockMessage &block_message);
 15
 16    // 批量委托报单
 17    virtual bool insert_batch_orders(const event_ptr &event, const OrderInputs &order_inputs);
 18
 19    // 算法单添加
 20    virtual bool insert_algo_order(const event_ptr &event);
 21
 22    // 委托撤单(预埋撤单和普通撤单)
 23    virtual bool cancel_order(const event_ptr &event) = 0;
 24
 25    // 预埋单撤单
 26    virtual bool cancel_order_trigger(const event_ptr &event);
 27
 28    // 算法单删除
 29    virtual bool cancel_algo_order(const event_ptr &event);
 30
 31    // 算法单启动/停止
 32    virtual bool toggle_algo_order(const event_ptr &event);
 33
 34    // 查询持仓
 35    virtual bool req_position() = 0;
 36
 37    // 查询资金
 38    virtual bool req_account() = 0;
 39
 40    // 查询预埋单状态
 41    virtual bool req_order_trigger();
 42
 43    // 两融合约查询
 44    virtual bool req_contract();
 45
 46    // 查询算法单(系统未调用, 最后再确认)
 47    virtual bool req_algo_order(const event_ptr &event);
 48
 49    // 查询历史委托
 50    virtual bool req_history_order(const event_ptr &event);
 51
 52    // 查询历史成交
 53    virtual bool req_history_trade(const event_ptr &event);
 54
 55    // 收到master广播的Band数据会触发该回调, 用于订阅band信道
 56    virtual void on_band(const event_ptr &event) {}
 57
 58    // 收到一个TimeKeyValue数据时触发回调
 59    virtual void on_time_key_value(const event_ptr &event) {}
 60
 61    // 收到一个策略的Deregister数据时触发回调, 表明Master检测到该进程已经退出
 62    virtual bool on_strategy_exit(const event_ptr &event);
 63
 64    // TD启动时处理风控信息
 65    void on_risk_setting(const longfist::types::RiskSetting &risk_setting);
 66
 67    // 获取该TD的location uname
 68    [[nodiscard]] const std::string &get_account_id() const;
 69
 70    // 获取写入资金Asset的writer
 71    [[nodiscard]] yijinjing::journal::writer_ptr get_asset_writer() const;
 72
 73    // 获取写入持仓Position的writer
 74    [[nodiscard]] yijinjing::journal::writer_ptr get_position_writer() const;
 75
 76
 77    // 启动时第一次获取到的资金Asset写到PUBLIC, 每分钟同步时需要写到SYNC, 调用该函数进行切换
 78    void enable_asset_sync();
 79
 80    // 启动时第一次获取到的持仓Position写到PUBLIC, 每分钟同步时需要写到SYNC, 调用该函数进行切换
 81    void enable_positions_sync();
 82
 83    [[nodiscard]] const OrderMap &get_orders() const;
 84
 85    [[nodiscard]] bool has_order(uint64_t order_id) const;
 86
 87    [[nodiscard]] kungfu::state<longfist::types::Order> &get_order(uint64_t order_id);
 88
 89    [[nodiscard]] const OrderActionMap &get_order_actions() const;
 90
 91    [[nodiscard]] bool has_order_action(uint64_t action_id) const;
 92
 93    [[nodiscard]] kungfu::state<longfist::types::OrderAction> &get_order_action(uint64_t action_id);
 94
 95    [[nodiscard]] const TradeMap &get_trades() const;
 96
 97    [[nodiscard]] const OrderTriggerMap &get_order_triggers() const;
 98
 99    [[nodiscard]] bool has_order_trigger(uint64_t trigger_id) const;
100
101    [[nodiscard]] kungfu::state<longfist::types::OrderTrigger> &get_order_trigger(uint64_t trigger_id);
102
103    [[nodiscard]] const OrderTriggerActionMap &get_order_trigger_actions() const;
104
105    [[nodiscard]] bool has_order_trigger_action(uint64_t action_id) const;
106
107    [[nodiscard]] kungfu::state<longfist::types::OrderTriggerAction> &get_order_trigger_action(uint64_t action_id);
108
109    [[nodiscard]] const AlgoOrderMap &get_algo_orders() const;
110
111    [[nodiscard]] bool has_algo_order(uint64_t algo_order_id) const;
112
113    [[nodiscard]] kungfu::state<longfist::types::AlgoOrder> &get_algo_order(uint64_t algo_order_id);
114
115    [[nodiscard]] const AlgoOrderActionMap &get_algo_order_actions() const;
116
117    [[nodiscard]] uint32_t get_risk_uid() const;
118
119    void disable_recover();
120
121    virtual void on_recover(){};
122
123    [[nodiscard]] bool is_sync_account() const { return sync_account_; }
124
125    void enable_sync_account() { sync_account_ = true; }
126
127    void disable_sync_account() { sync_account_ = false; }
128
129    void try_req_account();
130
131protected:
132    bool disable_recover_ = false;
133
134private:
135    bool sync_asset_ = false;
136    bool sync_position_ = false;
137    bool sync_account_ = false;
138    uint32_t risk_uid_ = 0;
139
140    void on_asset_sync();
141
142    void on_position_sync();
143
144    void recover();
145
146    void recover_from_journal();
147
148    void deal_write_frame();
149
150    void deal_read_frame();
151
152    [[nodiscard]] OrderService &get_order_service();
153
154    [[nodiscard]] const OrderService &get_order_service() const;
155
156    [[nodiscard]] OrderTriggerService &get_order_trigger_service();
157
158    [[nodiscard]] const OrderTriggerService &get_order_trigger_service() const;
159
160    [[nodiscard]] AlgoOrderService &get_algo_order_service();
161
162    [[nodiscard]] const AlgoOrderService &get_algo_order_service() const;
163};

主要回调接口

insert_order

普通委托报单

virtual bool insert_order(const event_ptr &event) = 0;

收到委托报单输入OrderInput时会调用该函数, 调用 event->data<OrderInput>() 获取委托内容, 将OrderInput中的待报单信息, 按照柜台api的要求直接填写或者构建所需的消息体后填写, 调用api完成报单发送.

报单完成后TD需要生成一个Order写回给报单的进程, 通知该笔委托的状态.

API的委托报单接口分为同步和异步两种,

  1. 同步报单: 调用完API下单接口后会立刻获取该委托在柜台服务器的委托编号

    需要立刻建立 <API委托号, order_id> 映射关系, 在收到成交推送后根据此映射生成Trade和修改Order状态

  2. 异步报单: 调用完API下单接口后只能获取是否发送报单信息成功

    在API的报单响应回调中才能获取该委托在柜台服务器的委托号, 在报单过程中一般会携带一个本地定义的request_id, 在报单响应中会有request_id和API委托号, 需要在下单时构建好 <request_id, order_id>的映射关系, 在报单响应中根绝request_id找到order_id, 再建立 <API委托号, order_id> 映射关系.

参数

参数

类型

说明

event

const event_ptr &

包含待报单信息以及来源和触发时间等信息

返回值

类型

说明

bool

报单成功返回true, 报单失败返回false

范例

  1// 异步报单, ctp的insert_order实现
  2bool TraderCTP::insert_order(const event_ptr &event) {
  3    const OrderInput &input = event->data<OrderInput>();
  4    SPDLOG_DEBUG("OrderInput: {}", input.to_string());
  5
  6    int request_id = get_request_id();
  7    int error_id = 0;
  8    uint64_t orderRef_key = 0;
  9
 10    CThostFtdcInputOrderField ctp_input{};
 11    to_ctp(ctp_input, input);
 12    strcpy(ctp_input.BrokerID, config_.broker_id.c_str());
 13    strcpy(ctp_input.InvestorID, config_.account_id.c_str());
 14    strcpy(ctp_input.OrderRef, std::to_string(++order_ref_).c_str());
 15    SPDLOG_DEBUG("CThostFtdcInputOrderField: {}", to_string(ctp_input));
 16    error_id = api_->ReqOrderInsert(&ctp_input, request_id);
 17    orderRef_key = get_orderRef_key(front_id_, session_id_, ctp_input.OrderRef);
 18
 19    auto nano = time::now_in_nano();
 20    auto writer = get_writer(event->source());
 21    Order &order = writer->open_data<Order>(event->gen_time());
 22    order_from_input(input, order);
 23    order.insert_time = nano;
 24    order.update_time = nano;
 25    order.time_condition = from_ctp_time_condition(ctp_input.TimeCondition);
 26
 27    if (error_id != 0) {
 28        order.error_id = error_id;
 29        order.status = OrderStatus::Error;
 30    }
 31
 32    SPDLOG_DEBUG("Order: {}", order.to_string());
 33    writer->close_data();
 34    map_request_id_to_kf_order_id_.insert_or_assign(request_id, uint64_t(input.order_id));
 35    map_OrderRefKey_to_kf_order_id_.insert_or_assign(orderRef_key, uint64_t(input.order_id));
 36    map_kf_order_id_to_OrderRef_.insert_or_assign(input.order_id, ctp_input.OrderRef);
 37    return error_id == 0;
 38}
 39
 40bool TraderCTP::custom_OnRtnOrder(const CThostFtdcOrderField &ctp_order) {
 41    SPDLOG_DEBUG("CThostFtdcOrderField: {}", to_string(ctp_order));
 42
 43    const std::string str_ExchangeID_OrderLocalID =
 44        make_ExchangeID_OrderSysID(ctp_order.ExchangeID, ctp_order.OrderLocalID);
 45    const std::string str_ExchangeID_OrderSysID = make_ExchangeID_OrderSysID(ctp_order.ExchangeID, ctp_order.OrderSysID);
 46    // orderRef_key 作为 request_id 中间桥梁的角色, 辅助链接 API委托和order_id
 47    uint64_t orderRef_key = get_orderRef_key(ctp_order.FrontID, ctp_order.SessionID, ctp_order.OrderRef);
 48    auto OrderRefKey_iter = map_OrderRefKey_to_kf_order_id_.find(orderRef_key);
 49    auto ExchangeID_OrderSysID_iter = map_ExchangeID_OrderSysID_to_kf_order_id_.find(str_ExchangeID_OrderSysID);
 50    auto ExchangeID_OrderLocalID_iter = map_ExchangeID_OrderSysID_to_kf_order_id_.find(str_ExchangeID_OrderLocalID);
 51
 52    // 系统外订单信息
 53    if (OrderRefKey_iter == map_OrderRefKey_to_kf_order_id_.end() and
 54        ExchangeID_OrderSysID_iter == map_ExchangeID_OrderSysID_to_kf_order_id_.end() and
 55        ExchangeID_OrderLocalID_iter == map_ExchangeID_OrderSysID_to_kf_order_id_.end()) {
 56        SPDLOG_WARN("external Order with ExchangeID: {}, OrderSysID: {}, OrderLocalID: {}", ctp_order.ExchangeID,
 57                    ctp_order.OrderSysID, ctp_order.OrderLocalID);
 58        return generate_external_order(ctp_order);
 59    }
 60
 61    uint64_t kf_order_id = 0;
 62    if (strlen(ctp_order.OrderSysID) != 0 and
 63        ExchangeID_OrderSysID_iter != map_ExchangeID_OrderSysID_to_kf_order_id_.end()) {
 64        // 已经存在 <ExchangeID_OrderSysID, order_id>, 表示撤单或者是重新查询委托状态信息
 65        kf_order_id = ExchangeID_OrderSysID_iter->second;
 66    } else if (OrderRefKey_iter != map_OrderRefKey_to_kf_order_id_.end()) {
 67        // 已经存在 <OrderRefKey, order_id>, 表示本次连接下的单
 68        kf_order_id = OrderRefKey_iter->second;
 69    } else if (ExchangeID_OrderLocalID_iter != map_ExchangeID_OrderSysID_to_kf_order_id_.end()) {
 70        // 表示系统外委托的第二次推送, 第一次推送时只有 OrderLocalID 没有 OrderLocalID
 71        kf_order_id = ExchangeID_OrderLocalID_iter->second;
 72    } else {
 73        SPDLOG_ERROR("invalidate CThostFtdcOrderField: {}", to_string(ctp_order));
 74        return false;
 75    }
 76
 77    // 该笔报单请求首次到达CTP, 风控通过后返回的第1个OnRtnOrder回报, 此时因为还没有报入到交易所,
 78    // 所以回报中OrderSysID为空
 79    if (strlen(ctp_order.OrderSysID) != 0) {
 80        // 建立 <API委托号, order_id> 映射关系
 81        map_kf_order_id_to_OrderSysID_.insert_or_assign(kf_order_id, ctp_order.OrderSysID);
 82        map_ExchangeID_OrderSysID_to_kf_order_id_.insert_or_assign(str_ExchangeID_OrderSysID, kf_order_id);
 83    }
 84
 85    if (not has_order(kf_order_id)) {
 86        SPDLOG_WARN("no order_id {} in orders_", kf_order_id);
 87        return generate_external_order(ctp_order);
 88    }
 89
 90    auto &order_state = get_order(kf_order_id);
 91
 92    // 订单状态已经处于最终状态了就不再改动, 重连查询的时候会触发该情况
 93    if (is_final_status(order_state.data.status) and order_state.data.status != longfist::enums::OrderStatus::Lost and
 94        order_state.data.status != longfist::enums::OrderStatus::Cancelling) {
 95        return true;
 96    }
 97
 98    from_ctp(ctp_order, order_state.data);
 99    order_state.data.update_time = time::now_in_nano();
100    if (has_writer(order_state.dest)) {
101        write_to(order_state.data, order_state.dest);
102    } else {
103        ++try_write_to_order_count;
104        try_write_to(order_state.data, order_state.dest, [&]() {
105        --try_write_to_order_count;
106        try_ready();
107        });
108    }
109    try_deal_trade(str_ExchangeID_OrderSysID);
110    return true;
111}
112
113
114// 同步报单, xtp的insert_order实现
115bool TraderXTP::insert_order(const event_ptr &event) {
116    const OrderInput &input = event->data<OrderInput>();
117    SPDLOG_DEBUG("OrderInput: {}", input.to_string());
118    XTPOrderInsertInfo xtp_input = {};
119    to_xtp(xtp_input, input);
120
121    SPDLOG_DEBUG("XTPOrderInsertInfo: {}", to_string(xtp_input));
122    uint64_t order_xtp_id = api_->InsertOrder(&xtp_input, session_id_);
123    auto success = order_xtp_id != 0;
124
125    auto nano = yijinjing::time::now_in_nano();
126    auto writer = get_writer(event->source());
127    Order &order = writer->open_data<Order>(event->gen_time());
128    order_from_input(input, order);
129    order.external_order_id = std::to_string(order_xtp_id).c_str();
130    order.insert_time = nano;
131    order.update_time = nano;
132
133    if (success) {
134        // 报单成功后直接返回 API委托号, 可以立刻建立 <API委托号, order_id> 映射关系
135        map_kf_to_xtp_order_id_.emplace(uint64_t(input.order_id), order_xtp_id);
136        map_xtp_to_kf_order_id_.emplace(order_xtp_id, uint64_t(input.order_id));
137    } else {
138        auto error_info = api_->GetApiLastError();
139        order.error_id = error_info->error_id;
140        order.error_msg = error_info->error_msg;
141        order.status = OrderStatus::Error;
142    }
143
144    SPDLOG_DEBUG("Order: {}", order.to_string());
145    writer->close_data();
146    if (not success) {
147        SPDLOG_ERROR("fail to insert order {}, error id {}, {}", to_string(xtp_input), (int)order.error_id,
148                    order.error_msg);
149    }
150    return success;
151}

insert_order_trigger

预埋单下单报单

virtual bool insert_order_trigger(const event_ptr &event);

收到预埋单输入OrderTriggerInput时会调用该函数, 调用 event->data<OrderTriggerInput>() 获取预埋单内容.

将OrderTriggerInput中的待报预埋单信息, 按照柜台api的要求直接填写或者构建所需的消息体后填写, 调用api完成报预埋单发送.

报单完成后TD需要生成一个OrderTrigger写回给报单的进程, 通知预埋单的状态.

参数

参数

类型

说明

event

const event_ptr &

包含待报预埋单信息以及来源和触发时间等信息

返回值

类型

说明

bool

预埋报单成功返回true, 预埋报单失败返回false

范例

 1// ctp的预埋下单为异步接口, 在响应接口才能获取到预埋单委托号 ParkedOrderID
 2bool TraderCTP::insert_order_trigger(const event_ptr &event) {
 3    const OrderTriggerInput &trigger_input = event->data<OrderTriggerInput>();
 4    SPDLOG_DEBUG("OrderTriggerInput: {}", trigger_input.to_string());
 5
 6    int request_id = get_request_id();
 7    int error_id = 0;
 8    uint64_t orderRef_key = 0;
 9
10    //  预埋单
11    CThostFtdcParkedOrderField ctp_parked_input{};
12    to_ctp_parked(ctp_parked_input, trigger_input);
13    strncpy(ctp_parked_input.GTDDate,
14            yijinjing::time::strftime(trigger_input.insert_time, KUNGFU_TRADING_DAY_FORMAT).c_str(), 9);
15    SPDLOG_DEBUG("insert_order ctp_parked_input. trigger.insert_time={}, GTDDate: {}", trigger_input.insert_time,
16                ctp_parked_input.GTDDate);
17    strcpy(ctp_parked_input.BrokerID, config_.broker_id.c_str());
18    strcpy(ctp_parked_input.InvestorID, config_.account_id.c_str());
19    strcpy(ctp_parked_input.OrderRef, std::to_string(++order_ref_).c_str());
20    ctp_parked_input.RequestID = request_id;
21    SPDLOG_DEBUG("CThostFtdcParkedOrderField: {}", to_string(ctp_parked_input));
22    error_id = api_->ReqParkedOrderInsert(&ctp_parked_input, request_id);
23    orderRef_key = get_orderRef_key(front_id_, session_id_, ctp_parked_input.OrderRef);
24
25    auto nano = time::now_in_nano();
26    auto writer = get_writer(event->source());
27    OrderTrigger &trigger = writer->open_data<OrderTrigger>(event->gen_time());
28    order_trigger_from_input(trigger_input, trigger);
29    trigger.insert_time = nano;
30    trigger.update_time = nano;
31    trigger.time_condition = from_ctp_time_condition(ctp_parked_input.TimeCondition);
32
33    if (error_id != 0) {
34        trigger.error_id = error_id;
35        trigger.status = OrderStatus::Error;
36    }
37
38    SPDLOG_DEBUG("OrderTrigger: {}", trigger.to_string());
39    writer->close_data();
40    map_request_id_to_kf_order_id_.insert_or_assign(request_id, uint64_t(trigger.trigger_id));
41    map_OrderRefKey_to_kf_order_id_.insert_or_assign(orderRef_key, uint64_t(trigger.trigger_id));
42    return error_id == 0;
43}
44
45// ctp预埋下单响应
46bool TraderCTP::custom_OnRspParkedOrderInsert(const CThostFtdcParkedOrderField &ParkedOrder,
47                                          const CThostFtdcRspInfoField &RspInfo, int nRequestID, bool bIsLast) {
48    SPDLOG_DEBUG("CThostFtdcParkedOrderField: {}", to_string(ParkedOrder));
49    SPDLOG_DEBUG("CThostFtdcRspInfoField: {}", to_string(RspInfo));
50    SPDLOG_DEBUG("nRequestID: {}, bIsLast: {}", nRequestID, bIsLast);
51
52    auto trigger_id_iter = map_request_id_to_kf_order_id_.find(nRequestID);
53    if (trigger_id_iter == map_request_id_to_kf_order_id_.end()) {
54        SPDLOG_ERROR("CANNOT FIND trigger_id of {} in map_request_id_to_kf_order_id_", nRequestID);
55        return false;
56    }
57
58    auto trigger_id = trigger_id_iter->second;
59    if (not has_order_trigger(trigger_id)) {
60        SPDLOG_ERROR("CANNOT FIND tigger_id {} in triggers_", trigger_id);
61        return false;
62    }
63
64    auto &trigger_state = get_order_trigger(trigger_id);
65    map_trigger_id_to_ParkedOrderID_.insert_or_assign(trigger_id,
66                                                        std::pair<std::string, bool>{ParkedOrder.ParkedOrderID, false});
67    map_ParkedOrderId_to_trigger_id_.insert_or_assign(ParkedOrder.ParkedOrderID, trigger_id);
68    trigger_state.data.status = parked_status_to_trigger_status(ParkedOrder.Status);
69    strncpy(trigger_state.data.external_trigger_id, ParkedOrder.ParkedOrderID, strlen(ParkedOrder.ParkedOrderID));
70    trigger_state.data.error_id = RspInfo.ErrorID;
71    const std::string msg = gbk2utf8(RspInfo.ErrorMsg);
72    strncpy(trigger_state.data.error_msg, msg.c_str(), msg.length());
73    trigger_state.data.update_time = time::now_in_nano();
74
75    if (RspInfo.ErrorID != 0) {
76        trigger_state.data.status = OrderStatus::Error;
77        SPDLOG_ERROR("failed to insert order, ErrorId: {} ErrorMsg: {}, parked_order: {}", RspInfo.ErrorID,
78                    gbk2utf8(RspInfo.ErrorMsg), to_string(ParkedOrder));
79    }
80
81    try_write_to(trigger_state.data, trigger_state.dest);
82    SPDLOG_DEBUG("OrderTrigger: {}", trigger_state.data.to_string());
83
84    return true;
85}

备注

预埋下单一般是在休盘时间报单, 委托信息挂在ctp服务器上, 当开盘时自动将委托信息提交到交易所.

在预埋单开盘触发前可以撤销, 但是开盘触发后, 不论下单是否成功, ctp都不会推送预埋单的状态信息, 需要手动查询才能获取预埋单的最新状态信息.


insert_block_order

大宗交易报单

virtual bool insert_block_order(const event_ptr &event, const longfist::types::BlockMessage &block_message);

大宗交易报单和普通委托报单类似, 委托信息存在event->data<OrderInput>()里, 区别是多了和大宗交易相关的额外信息BlockMessage, 按照柜台api的要求直接填写或者构建所需的消息体后填写, 调用api完成报大宗单发送.

报单完成后TD需要生成一个Order写回给报单的进程, 通知该笔委托的状态.

参数

参数

类型

说明

event

const event_ptr &

包含待报大宗单信息以及来源和触发时间等信息

block_message

const longfist::types::BlockMessage &

包含待报对手方席位号以及成交约定号等信息

返回值

类型

说明

bool

大宗报单成功返回true, 大宗报单失败返回false

范例

 1// 金证股票柜台的大宗交易报单
 2bool TraderMaCli::insert_block_order(const event_ptr &event, const longfist::types::BlockMessage &block_message) {
 3    const OrderInput &order_input = event->data<OrderInput>();
 4    SPDLOG_DEBUG("OrderInput Message : {}", order_input.to_string());
 5
 6    const int64_t request_id = get_request_id();
 7    auto nano = kungfu::yijinjing::time::now_in_nano();
 8    auto writer = get_writer(event->source());
 9    Order &order = writer->open_data<Order>(event->gen_time());
10    order_from_input(order_input, order);
11    set_offset(order);
12    order.status = OrderStatus::Pending;
13    order.insert_time = nano;
14    order.update_time = nano;
15
16    map_request_to_order_.emplace(request_id, order.order_id);
17
18    CReqStkOrderField stField{};
19    auto iter_maStkUserLoginFiled = map_maszStkbd_to_maStkUserLoginFiled_.find(
20        kf_to_macli_exchange_id(order_input.exchange_id, stField.szStkbd));
21    if (iter_maStkUserLoginFiled == map_maszStkbd_to_maStkUserLoginFiled_.end()) {
22        const std::string msg = "找不到和交易板块" + std::string(stField.szStkbd) + "对应的用户下单信息, 无法下单";
23        SPDLOG_ERROR("找不到和交易板块{}对应的用户下单信息, 无法下单", stField.szStkbd);
24        order.status = OrderStatus::Error;
25        order.error_id = -1;
26        strncpy(order.error_msg, msg.c_str(), msg.length());
27        SPDLOG_DEBUG("Order: {}", order.to_string());
28        writer->close_data();
29        return false;
30    }
31    const CRspStkUserLoginField &login_filed = iter_maStkUserLoginFiled->second;
32    stField.llCuacctCode = login_filed.llCuacctCode;
33    stField.llCustCode = login_filed.llCustCode;
34    strncpy(stField.szTrdacct, login_filed.szStkTrdacct, 20);
35    strncpy(stField.szStkCode, order_input.instrument_id, 8);
36    strncpy(stField.szOrderPrice, std::to_string(order_input.limit_price).c_str(), 21);
37    stField.llOrderQty = order_input.volume / get_vol_multi(order_input.exchange_id,
38                                                            get_instrument_type(order_input.exchange_id,
39                                                                                order_input.instrument_id));
40    if (order_input.instrument_type == InstrumentType::Repo) {
41        if (order_input.side != Side::Sell) {
42        order.status = OrderStatus::Error;
43        order.error_id = -1;
44        strncpy(order.error_msg, "逆回购只能以Sell方式下单", ERROR_MSG_LEN);
45        SPDLOG_DEBUG("Order: {}", order.to_string());
46        writer->close_data();
47        return false;
48        }
49        stField.iStkBiz = 165;
50    } else {
51        stField.iStkBiz = kf_to_macli_iStkBiz(order_input.side);
52    }
53    stField.iStkBizAction = kf_to_macli_iStkBizAction(order_input);
54    strncpy(stField.szClientInfo, cust_trace_info_.c_str(), 256);
55    stField.chSecurityLevel = '1';
56    int iOrderBsn = int(order_input.order_id & 0x0000FFFF);
57    stField.iOrderBsn = iOrderBsn;
58    stField.iCuacctSn = iCuacctSn_;
59
60    /// 大宗交易需要额外填写的信息
61    if (order_input.block_id != 0) {
62        stField.llMatchNo = block_message.match_number;  // 成交约定号
63        std::string str_REDUCT = block_message.is_specific ? "REDUCT=1" : "REDUCT=0";
64        stField.iStkBiz = kf_to_macli_block_iStkBiz(order_input);
65        strncpy(stField.szOpptStkpbu, block_message.opponent_seat, 8);
66        strncpy(stField.szOrderText, str_REDUCT.c_str(), 256);
67    }
68
69    SPDLOG_DEBUG("CReqStkOrderField: {}", to_string(stField));
70    int ret = api_->ReqOrder(&stField, request_id);
71    SPDLOG_DEBUG("ReqOrder ret: {}", ret);
72    if (0 != ret) {
73        const std::string msg = gbk2utf8(api_->GetLastErrorText());
74        SPDLOG_ERROR("下单失败, return code: {}, error msg: {}", ret, msg);
75        order.status = OrderStatus::Error;
76        order.error_id = ret;
77        strncpy(order.error_msg, msg.c_str(), msg.length());
78    }
79
80    SPDLOG_DEBUG("Order: {}", order.to_string());
81    writer->close_data();
82    return 0 == ret;
83}

备注

大部分柜台的大宗交易报单和普通委托报单是同一个委托函数, 委托响应函数也相同, 区别只是是否填写了大宗交易相关的信息: 对手方席号, 成交约定号, 是否受限(特定)股份


insert_batch_orders

批量委托报单

virtual bool insert_batch_orders(const event_ptr &event, const OrderInputs &order_inputs);

有的柜台可能会有流控限制, 发送消息的数量有限制, 如果需要一次性报很多笔委托, 需要用到批量委托接口, 待报批量单信息将会存在order_inputs中, 按照柜台api的要求直接填写或者构建所需的消息体后填写, 调用api完成报批量单发送.

参数

参数

类型

说明

event

const event_ptr &

包含待报批量单来源和触发时间等信息

order_inputs

const OrderInputs &

包含待报批量单的信息

返回值

类型

说明

bool

批量报单成功返回true, 批量报单失败返回false

范例

  1// 华锐股票柜台批量委托报单
  2bool TraderAtp::insert_batch_orders(const event_ptr &event, const broker::OrderInputs &order_inputs) {
  3    SPDLOG_DEBUG("insert_batch_orders");
  4    auto writer = get_writer(event->source());
  5    std::vector<uint64_t> orderids{};
  6    ATPReqBatchCashAuctionOrderMsg msg{};
  7    int64_t nano = time::now_in_nano();
  8    OrderInput first_order = order_inputs.front();
  9    for (const OrderInput &order_input : order_inputs) {
 10        APIBatchCashAuctionOrderUnit unit{};
 11        strcpy(unit.security_id, order_input.instrument_id);
 12        kf_exchange_2_hr_market(order_input.exchange_id, unit.market_id);
 13        kf_side_2_hr_side(order_input.side, unit.side);
 14        unit.order_qty = ATPTradeAPI::DoubleExpandToInt(
 15            double(order_input.volume) /
 16                get_vol_multi(order_input.instrument_id, order_input.exchange_id, order_input.instrument_type),
 17            2);
 18        unit.price = ATPTradeAPI::DoubleExpandToInt(order_input.limit_price, 4);
 19        kf_price_type_2_hr_order_type(order_input.price_type, unit.order_type);
 20        kf_exchange_2_hr_market(order_input.exchange_id, msg.market_id);
 21        msg.order_array.push_back(unit);
 22        SPDLOG_DEBUG("APIBatchCashAuctionOrderUnit: {}", to_string(unit));
 23
 24        auto &order = writer->open_data<Order>(event->gen_time());
 25        order_from_input(order_input, order);
 26        order.insert_time = nano;
 27        order.update_time = nano;
 28        order.status = OrderStatus::Pending;
 29        SPDLOG_DEBUG("Order: {}", order.to_string());
 30        writer->close_data();
 31        orderids.push_back(order.order_id);
 32    }
 33
 34    strcpy(msg.security_id, first_order.instrument_id);
 35    kf_exchange_2_hr_market(first_order.exchange_id,
 36                            msg.market_id);
 37    msg.side = ATPSideConst::kBuy;
 38    msg.order_qty = ATPTradeAPI::DoubleExpandToInt(100, 2);
 39    msg.price = ATPTradeAPI::DoubleExpandToInt(10, 4);
 40    msg.order_type = ATPOrdTypeConst::kOptimalFiveLevelFullDealTransferCancel;
 41
 42    strcpy(msg.cust_id, cust_id_.c_str());
 43    strcpy(msg.fund_account_id, td_config_.fund_account_id.c_str());
 44    strcpy(msg.branch_id, td_config_.branch_id.c_str());
 45    msg.client_seq_id = get_client_seq_id();
 46    msg.order_way = order_way_;
 47    strcpy(msg.password, str_cipher_.c_str());
 48    msg.client_feature_code = str_trace_info_;
 49
 50    switch (msg.market_id) {
 51    case ATPMarketIDConst::kShangHai:
 52        strcpy(msg.account_id, sh_account_id_.c_str());
 53        break;
 54    case ATPMarketIDConst::kShenZhen:
 55        strcpy(msg.account_id, sz_account_id_.c_str());
 56        break;
 57    default:
 58        SPDLOG_ERROR("Invalidated market_id : {}", msg.market_id);
 59    }
 60    msg.batch_type = ATPBatchTypeConst::kBatch;
 61    // 将华锐批量成交的id与kongfu的批量orderid相对应, 先添加map信息, 防止回调回来过快找不到对应的order_ids
 62    map_client_to_orderids_.emplace(msg.client_seq_id, orderids);
 63    SPDLOG_DEBUG("ATPReqBatchCashAuctionOrderMsg: {}", to_string(msg));
 64    int ret = atp_trader_api_ptr_->ReqBatchCashAuctionOrder(&msg);
 65    SPDLOG_DEBUG("ReqBatchCashAuctionOrder return code : {} , return message : {}", ret,
 66                map_error_code.try_emplace(ret).first->second);
 67    if (ret != ErrorCode::kSuccess) {
 68        SPDLOG_ERROR("FAILED TO INSERT ORDER !");
 69        return false;
 70    }
 71    return true;
 72}
 73
 74// 华锐股票柜台批量委托回调
 75bool TraderAtp::custom_OnRspOrderStatusInternalAck(const BufferATPRspOrderStatusAckMsg &internal_ack) {
 76    // 优先检测是否属于批量单
 77    auto order_ids_iter = map_client_to_orderids_.find(internal_ack.client_seq_id);
 78    if (order_ids_iter != map_client_to_orderids_.end()) {
 79        std::vector<uint64_t> &orderids = order_ids_iter->second;
 80        if (orderids.empty()) {
 81        SPDLOG_ERROR("orderids of client_seq_id: {} is empty", internal_ack.client_seq_id);
 82        return false;
 83        }
 84        uint64_t order_id = orderids.front();
 85        map_kf_orderid_to_hr_cl_ord_no_.emplace(order_id, internal_ack.cl_ord_no);
 86        map_hr_cl_ord_no_to_kf_orderid_.emplace(internal_ack.cl_ord_no, order_id);
 87        orderids.erase(orderids.begin());
 88
 89        if (not has_order(order_id)) {
 90        SPDLOG_WARN("order_id: {} not in orders_", order_id);
 91        generate_real_time_external_order(internal_ack);
 92        return false;
 93        }
 94
 95        auto &order_state = get_order(order_id);
 96
 97        if (not is_final_status(order_state.data.status) or order_state.data.status == OrderStatus::Lost) {
 98        hr_order_status_ack_2_kf_order(internal_ack, order_state.data);
 99        // const std::string str_cl_ord_no = std::to_string(internal_ack.cl_ord_no);
100        // strncpy(order_state.data.external_order_id, str_cl_ord_no.c_str(), str_cl_ord_no.length());
101        if (order_state.data.status == OrderStatus::Error) {
102            order_state.data.error_id = internal_ack.reject_reason_code;
103            strncpy(order_state.data.error_msg,
104                    map_error_code.try_emplace(internal_ack.reject_reason_code).first->second.c_str(), ERROR_MSG_LEN);
105        }
106        order_state.data.update_time = time::now_in_nano();
107        if (OrderStatus ::Pending != order_state.data.status and has_writer(order_state.dest)) {
108            get_writer(order_state.dest)->write(order_state.data.update_time, order_state.data);
109        } else {
110            SPDLOG_WARN("no writer of {}:{}", order_state.dest, get_vendor().get_location_uname(order_state.dest));
111            SPDLOG_WARN("batch Order: {}", order_state.data.to_string());
112        }
113        SPDLOG_DEBUG("batch Order:{}", order_state.data.to_string());
114        }
115        try_deal_TradeERMsgs_(internal_ack.cl_ord_no);
116        return false;
117    }
118    // 余下为普通委托处理代码
119}

cancel_order

普通撤单和预埋撤单

virtual bool cancel_order(const event_ptr &event) = 0;

event->data<OrderAction>()中有一个OrderActionFlag枚举值, 表明是普通撤单还是预埋撤单.

按照柜台api的要求直接填写或者构建所需的消息体后填写, 调用api完成撤单发送.

若撤单失败, 需要写一个OrderActionError给调用撤单的进程, 说明失败原因.

参数

参数

类型

说明

event

const event_ptr &

包含待撤单信息以及来源和触发时间等信息

返回值

类型

说明

bool

撤单成功返回true, 撤单失败返回false

范例

  1// CTP撤单实现
  2bool TraderCTP::cancel_order(const event_ptr &event) {
  3    const OrderAction &action = event->data<OrderAction>();
  4    SPDLOG_DEBUG("order_action: {}", action.to_string());
  5    int request_id = get_request_id();
  6    int error_id = 0;
  7
  8    auto fn_generate_error = [&](const std::string &msg) -> bool {
  9        auto writer = get_writer(event->source());
 10        OrderActionError &error = writer->open_data<OrderActionError>(event->gen_time());
 11        error.error_id = -1;
 12        error.order_id = action.order_id;
 13        error.order_action_id = action.order_action_id;
 14        strncpy(error.error_msg, msg.c_str(), msg.length());
 15        SPDLOG_ERROR("OrderActionError: {}", error.to_string());
 16        writer->close_data();
 17        return false;
 18    };
 19
 20    auto OrderSysID_iter = map_kf_order_id_to_OrderSysID_.find(action.order_id);
 21    auto OrderRef_iter = map_kf_order_id_to_OrderRef_.find(action.order_id);
 22    if (OrderSysID_iter == map_kf_order_id_to_OrderSysID_.end() and OrderRef_iter == map_kf_order_id_to_OrderRef_.end()) {
 23        const std::string msg = fmt::format("FAILED TO CANCEL order {}, can't find related ctp order id", action.order_id);
 24        return fn_generate_error(msg);
 25    }
 26
 27    if (not has_order(action.order_id)) {
 28        SPDLOG_ERROR("no kf_order_id {} in orders_", action.order_id);
 29        const std::string msg = fmt::format("{} not in orders_, try again later!", action.order_id);
 30        return fn_generate_error(msg);
 31    }
 32
 33    auto &order_state = get_order(action.order_id);
 34    map_request_id_to_kf_order_id_.insert_or_assign(request_id, action.order_id);
 35    map_request_id_to_kf_action_id_.insert_or_assign(request_id, action.order_action_id);
 36
 37    // 普通撤单
 38    if (action.action_flag == OrderActionFlag::Cancel) {
 39        CThostFtdcInputOrderActionField ctp_action{};
 40        strcpy(ctp_action.BrokerID, config_.broker_id.c_str());
 41        strcpy(ctp_action.InvestorID, config_.account_id.c_str());
 42        ctp_action.FrontID = front_id_;
 43        ctp_action.SessionID = session_id_;
 44        ctp_action.ActionFlag = THOST_FTDC_AF_Delete;
 45        strcpy(ctp_action.InstrumentID, order_state.data.instrument_id);
 46        strcpy(ctp_action.ExchangeID, order_state.data.exchange_id);
 47        strcpy(ctp_action.OrderSysID, order_state.data.external_order_id);
 48        if (OrderRef_iter != map_kf_order_id_to_OrderRef_.end()) {
 49        strncpy(ctp_action.OrderRef, OrderRef_iter->second.c_str(), OrderRef_iter->second.length());
 50        }
 51        SPDLOG_DEBUG("CThostFtdcInputOrderActionField: {}", to_string(ctp_action));
 52        error_id = api_->ReqOrderAction(&ctp_action, request_id);
 53
 54        if (not is_final_status(order_state.data.status)) {
 55        order_state.data.status = OrderStatus::Cancelling;
 56        try_write_to(order_state.data, order_state.dest);
 57        }
 58        SPDLOG_DEBUG("Order: {}", order_state.data.to_string());
 59        return error_id == 0;
 60    }
 61
 62    // 预埋撤单
 63    if (action.action_flag == OrderActionFlag::TriggerCancel) {
 64        CThostFtdcParkedOrderActionField ctp_parked_input_action{};
 65        strcpy(ctp_parked_input_action.InstrumentID, order_state.data.instrument_id);
 66        strcpy(ctp_parked_input_action.ExchangeID, order_state.data.exchange_id);
 67        ctp_parked_input_action.ActionFlag = THOST_FTDC_AF_Delete;
 68        strcpy(ctp_parked_input_action.BrokerID, config_.broker_id.c_str());
 69        strcpy(ctp_parked_input_action.InvestorID, config_.account_id.c_str());
 70        strcpy(ctp_parked_input_action.UserID, config_.account_id.c_str());
 71        strcpy(ctp_parked_input_action.OrderSysID, order_state.data.external_order_id);
 72        if (OrderRef_iter != map_kf_order_id_to_OrderRef_.end()) {
 73        strncpy(ctp_parked_input_action.OrderRef, OrderRef_iter->second.c_str(), OrderRef_iter->second.length());
 74        }
 75        SPDLOG_DEBUG("CThostFtdcParkedOrderActionField: {}", to_string(ctp_parked_input_action));
 76        error_id = api_->ReqParkedOrderAction(&ctp_parked_input_action, request_id);
 77
 78        auto nano = time::now_in_nano();
 79        auto writer = get_writer(event->source());
 80        OrderTrigger &trigger = writer->open_data<OrderTrigger>(event->gen_time());
 81        trigger.trigger_id = action.order_action_id;
 82        order_trigger_from_order(order_state.data, trigger);
 83        trigger.insert_time = nano;
 84        trigger.update_time = nano;
 85
 86        if (error_id != 0) {
 87        trigger.error_id = error_id;
 88        trigger.status = OrderStatus::Error;
 89        }
 90
 91        SPDLOG_DEBUG("OrderTrigger: {}", trigger.to_string());
 92        writer->close_data();
 93
 94        return error_id == 0;
 95    }
 96
 97    SPDLOG_ERROR("Unrecognized action_flag: {}", action.action_flag);
 98
 99    return false;
100}

备注

普通撤单会触发委托推送, 和普通下单类似.

预埋撤单是在休盘时间提交, 开盘后立刻撤单, 同预埋下单一样, 触发前可以撤销, 触发后需要手动查询.


cancel_order_trigger

预埋单撤销

virtual bool cancel_order_trigger(const event_ptr &event);

根据event->data<OrderTriggerAction>()的trigger_id找到对应的预埋单委托号, 按照柜台api的要求直接填写或者构建所需的消息体后填写, 调用api完成撤单发送.

如果撤单失败, 需要写一个OrderTriggerActionError给调用报单的进程, 说明失败原因.

参数

参数

类型

说明

event

const event_ptr &

包含待撤预埋单信息以及来源和触发时间等信息

返回值

类型

说明

bool

撤预埋单成功返回true, 撤预埋单失败返回false

范例

 1bool TraderCTP::cancel_order_trigger(const event_ptr &event) {
 2    const OrderTriggerAction &action = event->data<OrderTriggerAction>();
 3    SPDLOG_DEBUG("OrderTriggerAction: {}", action.to_string());
 4
 5    auto parked_id_iter = map_trigger_id_to_ParkedOrderID_.find(action.trigger_id);
 6    if (parked_id_iter == map_trigger_id_to_ParkedOrderID_.end()) {
 7        SPDLOG_ERROR("CANNOT FIND trigger_id {} in map_trigger_id_to_ParkedOrderID_", action.trigger_id);
 8        return false;
 9    }
10
11    const std::pair<std::string, bool> &str_parked_pair = parked_id_iter->second;
12    int request_id = get_request_id();
13    int error_id = 0;
14
15    auto fn_generate_error = [&](int32_t error_id, const std::string &msg) -> bool {
16        auto writer = get_writer(event->source());
17        OrderTriggerActionError &error = writer->open_data<OrderTriggerActionError>(event->gen_time());
18        error.error_id = error_id;
19        error.trigger_id = action.trigger_id;
20        error.order_trigger_action_id = action.order_trigger_action_id;
21        strncpy(error.external_trigger_id, str_parked_pair.first.c_str(), str_parked_pair.first.length());
22        strncpy(error.error_msg, msg.c_str(), msg.length());
23        error.insert_time = time::now_in_nano();
24        SPDLOG_ERROR("OrderTriggerActionError: {}", error.to_string());
25        writer->close_data();
26        return false;
27    };
28
29    // ParkedOrderId 是空的说明下预埋单的时候是失败的, 响应函数返回的ParkedOrderId是空
30    if (str_parked_pair.first.empty()) {
31        return fn_generate_error(-1, "待删除的OrderTrigger没有对应的ParkedId");
32    }
33
34    if (str_parked_pair.second) {
35        // 删除预埋撤单
36        CThostFtdcRemoveParkedOrderActionField ctp_remove_parked_order_action{};
37        strcpy(ctp_remove_parked_order_action.BrokerID, config_.broker_id.c_str());
38        strcpy(ctp_remove_parked_order_action.InvestorID, config_.account_id.c_str());
39        strcpy(ctp_remove_parked_order_action.InvestUnitID, config_.account_id.c_str());
40        strcpy(ctp_remove_parked_order_action.ParkedOrderActionID, str_parked_pair.first.c_str()); /// 预埋撤单编号
41        SPDLOG_DEBUG("CThostFtdcRemoveParkedOrderActionField: {}", to_string(ctp_remove_parked_order_action));
42        error_id = api_->ReqRemoveParkedOrderAction(&ctp_remove_parked_order_action, request_id);
43    } else {
44        // 删除预埋下单
45        CThostFtdcRemoveParkedOrderField ctp_remove_parked_order{};
46        strcpy(ctp_remove_parked_order.BrokerID, config_.broker_id.c_str());
47        strcpy(ctp_remove_parked_order.InvestorID, config_.account_id.c_str());
48        strcpy(ctp_remove_parked_order.InvestUnitID, config_.account_id.c_str());
49        strcpy(ctp_remove_parked_order.ParkedOrderID, str_parked_pair.first.c_str());
50        SPDLOG_DEBUG("CThostFtdcRemoveParkedOrderField: {}", to_string(ctp_remove_parked_order));
51        error_id = api_->ReqRemoveParkedOrder(&ctp_remove_parked_order, request_id);
52    }
53
54    if (error_id != 0) {
55        return fn_generate_error(error_id, "删除预埋单出错");
56    }
57
58    auto &trigger_state = get_order_trigger(action.trigger_id);
59    trigger_state.data.update_time = time::now_in_nano();
60    trigger_state.data.status = OrderStatus::Cancelling;
61    try_write_to(trigger_state.data, trigger_state.dest);
62
63    return true;
64}

备注

撤销预埋单成功之后, 需要手动查询预埋单才能获取预埋单的最新状态.


req_position

查询持仓

virtual bool req_position() = 0;

向券商柜台查询账户当前持仓, TD进程进入就绪状态是会触发该函数, 同时系统也会每分钟自动触发一次查询以同步最新的持仓状态.

将全局存储的账户等信息, 按照柜台api的要求直接填写或者构建所需的消息体后填写, 调用api完成持仓查询发送.

有的柜台支持同步查询和异步查询, 有的柜台只支持同步查询.

参数

参数

类型

说明

返回值

类型

说明

bool

查询持仓成功返回true, 查询持仓失败返回false

范例

 1// xtp 异步查询持仓
 2bool TraderXTP::req_position() {
 3    SPDLOG_INFO("req_position");
 4    return api_->QueryPosition(nullptr, session_id_, get_request_id()) == 0;
 5}
 6
 7bool TraderXTP::custom_OnQueryPosition(const XTPQueryStkPositionRsp &position, const XTPRI &error_info, int request_id,
 8                                   bool is_last, uint64_t session_id) {
 9    if (error_info.error_id != 0) {
10        SPDLOG_ERROR("error_id:{}, error_msg: {}, request_id: {}, last: {}", error_info.error_id, error_info.error_msg,
11                    request_id, is_last);
12        return false;
13    }
14
15    SPDLOG_TRACE("XTPQueryStkPositionRsp: {}", to_string(position));
16    auto writer = get_position_writer(); // 第一次获取writer是写入到PUBLIC
17    Position &stock_pos = writer->open_data<Position>(0);
18    from_xtp(position, stock_pos);
19    stock_pos.holder_uid = get_home_uid();
20    stock_pos.source_id = get_home_uid();
21    stock_pos.instrument_type = get_instrument_type(stock_pos.exchange_id, stock_pos.instrument_id);
22    stock_pos.direction = Direction::Long;
23    stock_pos.update_time = yijinjing::time::now_in_nano();
24    SPDLOG_TRACE("Position: {}", stock_pos.to_string());
25    writer->close_data();
26    if (is_last) {
27        PositionEnd &end = writer->open_data<PositionEnd>(0);
28        end.holder_uid = get_home_uid();
29        writer->close_data();
30        enable_positions_sync(); // 调用该函数后, 之后每分钟同步一次的查询获取到的writer是写入到SYNC
31    }
32    return true;
33}
34
35
36// 顶点股票柜台只支持同步查询
37bool TraderItp::req_position() {
38    long nRet = 1;
39    bool update = false;
40    int64 idx = 0;
41    auto writer = get_position_writer();
42    while (nRet > 0) {
43        vector<ITPDK_ZQGL> arZqgl;
44        arZqgl.reserve(200); // 需要预分配足够空间,查询结果最大返回200条
45        nRet =
46            (long)SECITPDK_QueryPositions(get_account_id().c_str(), SORT_TYPE_AES,
47                                        200, idx, "", "", "", 1, arZqgl);
48        SPDLOG_TRACE("req_position success. Num of results {}", nRet);
49        for (auto &itr : arZqgl) {
50        idx = itr.BrowIndex;
51        if (get_account_id() == itr.AccountId) {
52            update = true;
53            auto &pos = writer->open_data<Position>(0);
54            //        SPDLOG_DEBUG("exchange {} instrumentid {}", itr.Market,
55            //        itr.StockCode);
56            from_itp(itr, pos);
57            pos.holder_uid = get_home_uid();
58            pos.source_op_id = get_home_uid();
59            pos.source_id = get_home_uid();
60            pos.ledger_category = LedgerCategory::Account;
61            writer->close_data();
62            SPDLOG_TRACE("req_position: {}", pos.to_string());
63            SPDLOG_TRACE("req_position StockName:{} -- "
64                        "AccountId:{}; StockCode:{}; CurrentQty:{}; BrowIndex:{}; "
65                        "DiluteCostPrice:{}; AvgBuyPrice: {}; "
66                        "CostBalance: {}",
67                        gbk2utf8(itr.StockName), itr.AccountId, itr.StockCode,
68                        (long)itr.CurrentQty, (long)itr.BrowIndex,
69                        itr.DiluteCostPrice, itr.AvgBuyPrice, itr.CostBalance);
70        }
71        }
72        idx++;
73    }
74    if (update) {
75        auto &end = writer->open_data<PositionEnd>(now());
76        end.holder_uid = get_home_uid();
77        writer->close_data();
78        enable_positions_sync();
79    }
80    return true;
81}

req_account

查询资金

virtual bool req_account() = 0;

向券商柜台查询账户当前资金, TD进程进入就绪状态是会触发该函数, 同时系统也会每分钟自动触发一次查询, 每7秒内如果有成交也会触发一次查询, 以同步最新的资金.

将全局存储的账户等信息, 按照柜台api的要求直接填写或者构建所需的消息体后填写, 调用api完成持仓查询发送.

有的柜台支持同步查询和异步查询, 有的柜台只支持同步查询.

参数

参数

类型

说明

返回值

类型

说明

bool

查询资金成功返回true, 查询资金失败返回false

范例

 1// xtp 异步查询资金
 2bool TraderXTP::req_account() {
 3    SPDLOG_INFO("req_account");
 4    return api_->QueryAsset(session_id_, get_request_id()) == 0;
 5}
 6
 7bool TraderXTP::custom_OnQueryAsset(const XTPQueryAssetRsp &asset, const XTPRI &error_info, int request_id,
 8                                bool is_last, uint64_t session_id) {
 9    if (error_info.error_id != 0) {
10        SPDLOG_ERROR("error_id: {}, error_msg: {}, request_id: {}, last: {}", error_info.error_id, error_info.error_msg,
11                    request_id, is_last);
12    }
13
14    if (error_info.error_id == 0 || error_info.error_id == 11000350) {
15        SPDLOG_TRACE("OnQueryAsset: {}", to_string(asset));
16        auto writer = get_asset_writer();
17        Asset &account = writer->open_data<Asset>(0);
18        if (error_info.error_id == 0) {
19        from_xtp(asset, account);
20        }
21        account.holder_uid = get_live_home_uid();
22        account.update_time = yijinjing::time::now_in_nano();
23        SPDLOG_TRACE("Asset: {}", account.to_string());
24        writer->close_data();
25        enable_asset_sync();
26    }
27    return true;
28}
29
30// 顶点股票柜台只支持同步查询
31bool TraderItp::req_account() {
32    vector<ITPDK_ZJZH> arZjzh;
33    arZjzh.reserve(5);
34    long nRet = (long)SECITPDK_QueryFundInfo(get_account_id().c_str(), arZjzh);
35    if (nRet < 0) // 查询失败
36    {
37        string msg = SECITPDK_GetLastError();
38        SPDLOG_ERROR("req_account failed. Msg: {}", gbk2utf8(msg));
39        return false;
40    } else {
41        SPDLOG_TRACE("req_account success. Num of results {}", nRet);
42        for (auto &itr : arZjzh) {
43        SPDLOG_TRACE("FundAccount:{};AccountId:{} -- "
44                    "MoneyType:{};FundAvl:{};FrozenBalance:{};TotalAsset:{};"
45                    "MarketValue:{}",
46                    itr.FundAccount, itr.AccountId, itr.MoneyType, itr.FundAvl,
47                    itr.FrozenBalance, itr.TotalAsset, itr.MarketValue);
48        if (strcmp(itr.AccountId, get_account_id().c_str()) ==
49            0 /* && strcmp(itr.FundAccount, fund_id_.c_str()) == 0*/) {
50            auto writer = get_asset_writer();
51            auto &asset = writer->open_data<Asset>(0);
52            memset(&asset, 0, sizeof(Asset));
53            asset.update_time = yijinjing::time::now_in_nano();
54            asset.avail = itr.FundAvl;
55            asset.frozen_cash = itr.FrozenBalance;
56            asset.holder_uid = get_home_uid();
57            asset.ledger_category = LedgerCategory::Account;
58            writer->close_data();
59            enable_asset_sync();
60            return true;
61        }
62        }
63    }
64    return false;
65}

req_history_order

查询历史委托

virtual bool req_history_order(const event_ptr &event);

向券商柜台查询账户历史委托, 以下两个场景需要调用柜台API的查询历史接口

  1. TD重启后恢复今日委托的最新状态, 更新在关闭期间的委托状态变化到本地

  2. 策略想要获取今日的所有委托信息, 以HistoryOrder的数据格式进行推送

该接口为策略主动查询账户当日历史委托情况, 将全局存储的账户等信息, 按照柜台api的要求直接填写或者构建所需的消息体后填写, 调用api完成历史委托查询发送.

交易柜台通常只能查询到当前交易日内的历史委托情况, 有的柜台支持同步查询和异步查询, 有的柜台只支持同步查询.

参数

参数

类型

说明

event

const event_ptr &

包含该查询操作来源等信息

返回值

类型

说明

bool

查询历史委托成功返回true, 查询历史委托失败返回false

范例

  1// xtp 异步查询历史委托
  2bool TraderXTP::req_history_order(const event_ptr &event) {
  3    XTPQueryOrderReq query_param{};
  4    int request_id = get_request_id();
  5    int ret = api_->QueryOrders(&query_param, session_id_, request_id);
  6    if (0 != ret) {
  7        SPDLOG_ERROR("QueryOrders False: {}", ret);
  8    }
  9    map_request_location_.emplace(request_id, event->source()); // 标记为策略查询历史委托
 10    return 0 == ret;
 11}
 12
 13bool TraderXTP::custom_OnQueryOrder(const XTPOrderInfo &order_info, const XTPRI &error_info, int request_id,
 14                                bool is_last, uint64_t session_id) {
 15    SPDLOG_DEBUG("XTPOrderInfo: {}", to_string(order_info));
 16    SPDLOG_DEBUG("XTPRI: {}", to_string(error_info));
 17    SPDLOG_DEBUG("request_id: {}, is_last: {}", request_id, is_last);
 18
 19    if (order_info.order_xtp_id == 0 and is_last and
 20        map_request_location_.find(request_id) != map_request_location_.end()) {
 21        SPDLOG_WARN("XTPQueryOrderRsp* order_info == nullptr, no data returned!");
 22        auto writer = get_history_writer(request_id);
 23        HistoryOrder &history_order = writer->open_data<HistoryOrder>();
 24        history_order.is_last = true;
 25        history_order.data_type = HistoryDataType::TotalEnd;
 26        const std::string msg = "No order today";
 27        history_order.error_msg = msg.c_str();
 28        writer->close_data();
 29        SPDLOG_DEBUG("HistoryOrder: {}", history_order.to_string());
 30        return false;
 31    }
 32
 33    // 此处判断是属于启动TD时恢复查询还是策略查询
 34    if (map_request_location_.find(request_id) == map_request_location_.end()) {
 35        // TD重连收到推送当做普通下单委托响应处理
 36        if (is_last) {
 37            req_order_over_ = true;
 38            try_ready();
 39        }
 40        return order_info.order_xtp_id != 0 and custom_OnOrderEvent(order_info, error_info, request_id);
 41    }
 42
 43    auto writer = get_history_writer(request_id);
 44    HistoryOrder &history_order = writer->open_data<HistoryOrder>();
 45
 46    if (error_info.error_id != 0) {
 47        SPDLOG_ERROR("OnQueryOrder False , error_code : {}, error_msg : {}", error_info.error_id, error_info.error_msg);
 48        history_order.error_id = error_info.error_id;
 49        history_order.error_msg = error_info.error_msg;
 50    }
 51
 52    from_xtp(order_info, history_order);
 53    history_order.order_id = writer->current_frame_uid();
 54    history_order.is_last = is_last;
 55    history_order.insert_time = yijinjing::time::now_in_nano();
 56    history_order.update_time = history_order.insert_time;
 57    SPDLOG_DEBUG("HistoryOrder: {}", history_order.to_string());
 58    writer->close_data();
 59    return true;
 60}
 61
 62
 63// xtp在启动时查询今日委托和成交信息, 没有标记 map_request_location_, custom_OnQueryOrder会走到custom_OnOrderEvent作为普通委托处理
 64void TraderXTP::req_order_trade() {
 65    if (disable_recover_) {
 66        return try_ready();
 67    }
 68
 69    XTPQueryOrderReq query_order_param{};
 70    int ret = api_->QueryOrders(&query_order_param, session_id_, get_request_id());
 71    if (0 != ret) {
 72        SPDLOG_ERROR("QueryOrders False: {}", ret);
 73    }
 74
 75    XTPQueryTraderReq query_trade_param{};
 76    ret = api_->QueryTrades(&query_trade_param, session_id_, get_request_id());
 77    if (0 != ret) {
 78        SPDLOG_ERROR("QueryTrades False : {}", ret);
 79    }
 80}
 81
 82
 83// 顶点股票柜台同步查询历史委托
 84bool TraderItp::req_history_order(const event_ptr &event) {
 85    SPDLOG_INFO("req_history_order");
 86    const RequestHistoryOrder &request = event->data<RequestHistoryOrder>();
 87    SPDLOG_INFO("RequestHistoryOrder: {}", request.to_string());
 88    uint32_t source = event->source();
 89    uint32_t limit = request.query_num;
 90
 91    auto writer = get_writer(event->source());
 92    if (req_history_order_query_.find(source) == req_history_order_query_.end()) {
 93        long nRet = 1;
 94        std::vector<ITPDK_DRWT> his_orders;
 95        req_history_order_query_.insert(std::make_pair(source, his_orders));
 96        int64 idx = 0;
 97        while (nRet > 0) {
 98        vector<ITPDK_DRWT> arDrwt;
 99        arDrwt.reserve(200); // 需要预分配足够空间,查询结果最大返回200条
100        nRet =
101            (long)SECITPDK_QueryOrders(get_account_id().c_str(), 0, SORT_TYPE_AES,
102                                        200, idx, "", "", 0, arDrwt);
103        req_history_order_query_[source].insert(
104            req_history_order_query_[source].end(), arDrwt.begin(), arDrwt.end());
105        idx = arDrwt.back().BrowIndex + 1;
106
107        if (nRet < 0) {
108            std::string error_msg = gbk2utf8(SECITPDK_GetLastError());
109            SPDLOG_ERROR("req_history_order ERROR! SECITPDK_QueryOrders return: {} "
110                        ", error message : {}",
111                        nRet, error_msg);
112
113            RequestHistoryOrderError &error =
114                writer->open_data<RequestHistoryOrderError>(event->gen_time());
115            error.error_id = nRet;
116            strncpy(error.error_msg, error_msg.c_str(), ERROR_MSG_LEN);
117            error.trigger_time = event->gen_time();
118            writer->close_data();
119            return false;
120        }
121        }
122        size_t ret_size = req_history_order_query_[source].size();
123        SPDLOG_INFO("req_history_order success. Num of results {}", ret_size);
124    }
125    auto it = req_history_order_query_[source].begin();
126    while (it != req_history_order_query_[source].end() && limit > 0) {
127        HistoryOrder &history_order = writer->open_data<HistoryOrder>(now());
128        from_itp(*it, history_order, trading_day_);
129        history_order.order_id = writer->current_frame_uid();
130        limit--;
131        it++;
132        if (it == req_history_order_query_[source].end()) {
133        history_order.data_type = HistoryDataType::TotalEnd;
134        } else if (limit == 0) {
135        history_order.data_type = HistoryDataType::PageEnd;
136        } else {
137        history_order.data_type = HistoryDataType::Normal;
138        }
139        writer->close_data();
140    }
141    if (it == req_history_order_query_[source].end()) {
142        req_history_order_query_.erase(source);
143    } else {
144        req_history_order_query_[source].erase(
145            req_history_order_query_[source].begin(),
146            req_history_order_query_[source].begin() + request.query_num);
147    }
148    return true;
149}

req_history_trade

查询历史成交

virtual bool req_history_trade(const event_ptr &event);

向券商柜台查询账户历史成交, 以下两个场景需要调用柜台API的查询历史接口

  1. TD重启后恢复今日成交, 更新在关闭期间的成交到本地

  2. 策略想要获取今日的所有成交信息, 以HistoryTrade的数据格式进行推送

该接口为策略主动查询账户当日历史成交情况, 将全局存储的账户等信息, 按照柜台api的要求直接填写或者构建所需的消息体后填写, 调用api完成历史成交查询发送.

交易柜台通常只能查询到当前交易日内的历史成交情况, 有的柜台支持同步查询和异步查询, 有的柜台只支持同步查询.

参数

参数

类型

说明

event

const event_ptr &

包含该查询操作来源等信息

返回值

类型

说明

bool

查询历史成交成功返回true, 查询历史成交失败返回false

范例

  1//
  2bool TraderXTP::req_history_trade(const event_ptr &event) {
  3    XTPQueryTraderReq query_param{};
  4    int request_id = get_request_id();
  5    int ret = api_->QueryTrades(&query_param, session_id_, request_id);
  6    if (0 != ret) {
  7        SPDLOG_ERROR("QueryTrades False : {}", ret);
  8    }
  9    map_request_location_.emplace(request_id, event->source());
 10    return 0 == ret;
 11}
 12
 13bool TraderXTP::custom_OnQueryTrade(const XTPTradeReport &trade_info, const XTPRI &error_info, int request_id,
 14                                bool is_last, uint64_t session_id) {
 15    SPDLOG_DEBUG("XTPTradeReport: {}", to_string(trade_info));
 16    SPDLOG_DEBUG("XTPRI: {}", to_string(error_info));
 17    SPDLOG_DEBUG("request_id: {}, is_last: {}", request_id, is_last);
 18
 19    // 查询历史流水收到nullptr, 经过journal走一圈后表现形式为 order_xtp_id == 0
 20    if (trade_info.order_xtp_id == 0 and is_last and
 21        map_request_location_.find(request_id) != map_request_location_.end()) {
 22        SPDLOG_WARN("XTPQueryTradeRsp* trade_info == nullptr, no data returned!");
 23        auto writer = get_history_writer(request_id);
 24        HistoryTrade &history_trade = writer->open_data<HistoryTrade>(now());
 25        history_trade.is_last = true;
 26        history_trade.data_type = HistoryDataType::TotalEnd;
 27        const std::string msg = "No trade today";
 28        history_trade.error_msg = msg.c_str();
 29        writer->close_data();
 30        return false;
 31    }
 32
 33    if (map_request_location_.find(request_id) == map_request_location_.end()) {
 34        // TD重连收到推送当做普通交易成交回报推送处理
 35        if (is_last) {
 36        req_trade_over_ = true;
 37        try_ready();
 38        }
 39        return trade_info.order_xtp_id != 0 and custom_OnTradeEvent(trade_info, session_id);
 40    }
 41
 42    auto writer = get_history_writer(request_id);
 43    HistoryTrade &history_trade = writer->open_data<HistoryTrade>(now());
 44
 45    if (error_info.error_id != 0) {
 46        SPDLOG_ERROR("OnQueryTrade False , error_code : {}, error_msg : {}", error_info.error_id, error_info.error_msg);
 47        history_trade.error_id = error_info.error_id;
 48        history_trade.error_msg = error_info.error_msg;
 49    }
 50
 51    from_xtp(trade_info, history_trade);
 52    history_trade.trade_id = writer->current_frame_uid();
 53    history_trade.is_last = is_last;
 54    history_trade.trade_time = yijinjing::time::now_in_nano();
 55    history_trade.instrument_type = get_instrument_type(history_trade.exchange_id, history_trade.instrument_id);
 56    SPDLOG_DEBUG("HistoryTrade: {}", history_trade.to_string());
 57    writer->close_data();
 58    return false;
 59}
 60
 61
 62// 顶点股票柜台同步查询历史委托
 63bool TraderItp::req_history_trade(const event_ptr &event) {
 64    SPDLOG_INFO("req_history_trade");
 65    const RequestHistoryTrade &request = event->data<RequestHistoryTrade>();
 66    SPDLOG_INFO("RequestHistoryTrade: {}", request.to_string());
 67    uint32_t source = event->source();
 68    uint32_t limit = request.query_num;
 69
 70    auto writer = get_writer(event->source());
 71    if (req_history_trade_query_.find(source) == req_history_trade_query_.end()) {
 72        long nRet = 1;
 73        std::vector<ITPDK_SSCJ> his_trades;
 74        req_history_trade_query_.insert(std::make_pair(source, his_trades));
 75        int64 idx = 0;
 76        while (nRet > 0) {
 77        vector<ITPDK_SSCJ> arSscj;
 78        arSscj.reserve(200); // 需要预分配足够空间,查询结果最大返回200条
 79        nRet =
 80            (long)SECITPDK_QueryMatchs(get_account_id().c_str(), 0, SORT_TYPE_AES,
 81                                        200, idx, "", "", 0, arSscj);
 82        req_history_trade_query_[source].insert(
 83            req_history_trade_query_[source].end(), arSscj.begin(), arSscj.end());
 84        idx = arSscj.back().BrowIndex + 1;
 85
 86        if (nRet < 0) {
 87            std::string error_msg = gbk2utf8(SECITPDK_GetLastError());
 88            SPDLOG_ERROR("req_history_trade ERROR! SECITPDK_QueryMatchs return: {} "
 89                        ", error message : {}",
 90                        nRet, error_msg);
 91            RequestHistoryTradeError &error =
 92                writer->open_data<RequestHistoryTradeError>(event->gen_time());
 93            error.error_id = nRet;
 94            strncpy(error.error_msg, error_msg.c_str(), ERROR_MSG_LEN);
 95            error.trigger_time = event->gen_time();
 96            writer->close_data();
 97            return false;
 98        }
 99        }
100        size_t ret_size = req_history_trade_query_[source].size();
101        SPDLOG_DEBUG("req_history_trade success. Num of results {}", ret_size);
102    }
103    auto it = req_history_trade_query_[source].begin();
104    while (it != req_history_trade_query_[source].end() && limit > 0) {
105        HistoryTrade &history_trade = writer->open_data<HistoryTrade>(now());
106        from_itp(*it, history_trade, trading_day_);
107        history_trade.trade_id = writer->current_frame_uid();
108        limit--;
109        it++;
110        if (it == req_history_trade_query_[source].end()) {
111        history_trade.data_type = HistoryDataType::TotalEnd;
112        } else if (limit == 0) {
113        history_trade.data_type = HistoryDataType::PageEnd;
114        } else {
115        history_trade.data_type = HistoryDataType::Normal;
116        }
117        writer->close_data();
118    }
119    if (it == req_history_trade_query_[source].end()) {
120        req_history_trade_query_.erase(source);
121    } else {
122        req_history_trade_query_[source].erase(
123            req_history_trade_query_[source].begin(),
124            req_history_trade_query_[source].begin() + request.query_num);
125    }
126    return true;
127}

req_order_trigger

查询预埋单

virtual bool req_order_trigger();

向券商柜台查询预埋单最新状态, 预埋单触发或者撤销后, 不会实时推送预埋单的最新状态, 需要手动查询

参数

参数

类型

说明

event

const event_ptr &

包含该查询操作来源等信息

返回值

类型

说明

bool

查询历史成交成功返回true, 查询历史成交失败返回false

范例

 1// ctp的查询预埋单的实现
 2bool TraderCTP::req_order_trigger() { return req_trigger(); }
 3
 4bool TraderCTP::req_trigger() {
 5    if (disable_recover_) {
 6        return false;
 7    }
 8
 9    add_timer_req_insert_trigger(now() + 1 * time_unit::NANOSECONDS_PER_SECOND);
10    add_timer_req_cancel_trigger(now() + 3 * time_unit::NANOSECONDS_PER_SECOND);
11    return true;
12}
13
14// 查询预埋下单
15void TraderCTP::add_timer_req_insert_trigger(int64_t nano) {
16    add_timer(nano, [&](const auto &event) {
17        CThostFtdcQryParkedOrderField req_parked{};
18        strcpy(req_parked.BrokerID, config_.broker_id.c_str());
19        strcpy(req_parked.InvestorID, config_.account_id.c_str());
20        SPDLOG_INFO("CThostFtdcQryParkedOrderField: {}", to_string(req_parked));
21        int request_id = get_request_id();
22        int rtn = api_->ReqQryParkedOrder(&req_parked, request_id);
23        SPDLOG_INFO("ReqQryTrade rtn {}", rtn);
24        if (rtn != 0) {
25        add_timer_req_insert_trigger(time::now_in_nano() + 3 * time_unit::NANOSECONDS_PER_SECOND);
26        }
27    });
28}
29
30// 查询预埋撤单
31void TraderCTP::add_timer_req_cancel_trigger(int64_t nano) {
32    add_timer(nano, [&](const auto &event) {
33        CThostFtdcQryParkedOrderActionField req_parked_action{};
34        strcpy(req_parked_action.BrokerID, config_.broker_id.c_str());
35        strcpy(req_parked_action.InvestorID, config_.account_id.c_str());
36        SPDLOG_INFO("CThostFtdcQryParkedOrderActionField: {}", to_string(req_parked_action));
37        int request_id = get_request_id();
38        int rtn = api_->ReqQryParkedOrderAction(&req_parked_action, request_id);
39        SPDLOG_INFO("ReqQryTrade rtn {}", rtn);
40        if (rtn != 0) {
41        add_timer_req_cancel_trigger(time::now_in_nano() + 3 * time_unit::NANOSECONDS_PER_SECOND);
42        }
43    });
44}

备注

ctp有流控限制, 连续查询可能会失败, 失败后可以添加一个定时器3秒后再查一次.


req_contract

两融合约查询

virtual bool req_contract();

向券商柜台查询两融合约状态, 融资买入和融券卖出成交后, 两融合约不会实时推送, 想要获取当前最新的两融合约状态需要手动查询.

范例

 1// 华锐两融查询合约
 2bool TraderATP::req_contract() { return req_contractSpecifications(false); }
 3
 4// 融资融券合约明细查询函数
 5bool TraderATP::req_contractSpecifications(bool is_query_position) {
 6    if (!req_contract_status_) {
 7        // 如果上次点击前端按钮查合约 还没有结束 就不能重复查询
 8        // 因为查持仓那里已经有防止重复查的开关了 所以持仓查合约是不会重复查的 可以不做处理
 9        SPDLOG_ERROR("上次更新尚未结束 请稍后再点击更新合约按钮");
10        return false;
11    }
12    if (!is_query_position) {
13        req_contract_status_ = false;
14    }
15    SPDLOG_INFO("融资融券合约明细查询 req_contractSpecifications");
16    ATPReqExtQueryContractSpecificationsMsg req_contract_specifications_msg{};
17
18    strcpy(req_contract_specifications_msg.cust_id, cust_id_.c_str()); /// 客户号ID(必填)
19    strcpy(req_contract_specifications_msg.fund_account_id, td_config_.fund_account_id.c_str()); /// 资金账户ID(必填)
20    int64_t seq_id = get_client_seq_id();
21    if (is_query_position) {
22        set_contract_id_.insert(seq_id);
23    }
24    req_contract_specifications_msg.client_seq_id = seq_id; /// 用户系统消息序号(必填)
25    strcpy(req_contract_specifications_msg.branch_id, td_config_.branch_id.c_str()); /// 营业部ID(必填)
26    ATPRetCodeType ret = atp_trader_api_ptr_->ReqExtQueryContractSpecifications(&req_contract_specifications_msg);
27    SPDLOG_INFO("req_contract return code : {} , "
28                "return message : {}",
29                ret, map_error_code.try_emplace(ret).first->second);
30    if (ret != ErrorCode::kSuccess) {
31        SPDLOG_ERROR("req_contract error return code : {} , "
32                    "return message : {}",
33                    ret, map_error_code.try_emplace(ret).first->second);
34    }
35    return true;
36}

on_band

virtual void on_band(const event_ptr &event);

当有一个进程调用request_band创建一个写入信道时, master会广播这条信道的信息, 所有的进程都会收到一个Band信息, 可以在on_band中选择是否要订阅这条信道.

一般场景下不会用到该接口.


主要主调接口

update_broker_state

void update_broker_state(BrokerState state);

修改TD的状态

范例

 1// xtp登录失败后将TD状态设置成LoginFailed
 2void TraderXTP::on_start() {
 3    if (config_.client_id < 1 or config_.client_id > 99) {
 4        SPDLOG_ERROR("client_id must between 1 and 99");
 5    }
 6    std::string runtime_folder = get_runtime_folder();
 7    SPDLOG_INFO("Connecting XTP account {} with tcp://{}:{}", config_.account_id, config_.td_ip, config_.td_port);
 8    api_ = XTP::API::TraderApi::CreateTraderApi(config_.client_id, runtime_folder.c_str());
 9    api_->RegisterSpi(this);
10    api_->SubscribePublicTopic(XTP_TERT_QUICK);
11    api_->SetSoftwareVersion("1.1.0");
12    api_->SetSoftwareKey(config_.software_key.c_str());
13    session_id_ = api_->Login(config_.td_ip.c_str(), config_.td_port, config_.account_id.c_str(),
14                                config_.password.c_str(), XTP_PROTOCOL_TCP);
15    if (session_id_ > 0) {
16        SPDLOG_INFO("Login successfully");
17        req_order_trade();
18    } else {
19        update_broker_state(BrokerState::LoginFailed);
20        SPDLOG_ERROR("Login failed [{}]: {}", api_->GetApiLastError()->error_id, api_->GetApiLastError()->error_msg);
21    }
22}
23
24// xtp恢复完委托和成交后将TD状态设置成Ready
25void TraderXTP::try_ready() {
26    if (BrokerState::Ready == get_state()) {
27        return;
28    }
29
30    SPDLOG_DEBUG("req_order_over_: {}, req_trade_over_: {}", req_order_over_, req_trade_over_);
31    if (disable_recover_ or (req_order_over_ and req_trade_over_)) {
32        update_broker_state(BrokerState::Ready);
33    }
34}

get_orders

const OrderMap &get_orders();

获取内存里的所有委托Order

范例

 1// 在on_recover中根据委托恢复Kungfu的order_id和xtp的委托号映射关系
 2for (auto &pair : get_orders()) {
 3    SPDLOG_DEBUG("Order: {}", pair.second.data.to_string());
 4    const std::string str_external_order_id = pair.second.data.external_order_id.to_string();
 5    if (not str_external_order_id.empty()) {
 6    uint64_t order_id = pair.first;
 7    uint64_t order_xtp_id = std::stoull(str_external_order_id);
 8    map_xtp_to_kf_order_id_.emplace(order_xtp_id, order_id);
 9    map_kf_to_xtp_order_id_.emplace(order_id, order_xtp_id);
10    }
11}

get_trades

const TradeMap &get_trades();

获取内存里的所有成交Trade

范例

1// 在on_recover中根据成交恢复已处理的成交编号
2for (auto &pair : get_trades()) {
3    SPDLOG_DEBUG("Trade: {}", pair.second.data.to_string());
4    uint64_t order_xtp_id = std::stoull(pair.second.data.external_order_id);
5    map_xtp_order_id_to_xtp_trader_ids_.try_emplace(order_xtp_id)
6        .first->second.emplace(pair.second.data.external_trade_id.to_string());
7}

get_order_triggers

const OrderTriggerMap &get_order_triggers()

获取内存里的所有预埋单OrderTrigger

范例

1// ctp在on_recover中根据预埋单恢复Kungfu的trigger_id和ctp的预埋单编号映射
2for (const auto &trigger_pair : get_order_triggers()) {
3    const OrderTrigger &trigger = trigger_pair.second.data;
4    map_trigger_id_to_ParkedOrderID_.insert_or_assign(
5        trigger.trigger_id, std::pair<std::string, bool>{trigger.external_trigger_id,
6                                                        trigger.action_flag == OrderTriggerFlag::TriggerCancel});
7    map_ParkedOrderId_to_trigger_id_.insert_or_assign(trigger.external_trigger_id, trigger.trigger_id);
8    SPDLOG_DEBUG("OrderTrigger: {}", trigger.to_string());
9}

disable_recover

void disable_recover();

关闭恢复委托和成交, 在pre_start调用改接口后, 重启td之后不再恢复关闭前的委托数据, 并且将处于未完成状态的委托设置成 “丢失” 状态. 如果不调用该接口, 重启td时, 会自动从journal和db中读取从昨天下午4点开始的到现在的所有委托和成交数据, 恢复到内存中, 可以通过get_orders(), get_trades(), get_order_triggers()获取数据

范例

1// xtp在pre_start中根据配置信息设置是否要关闭恢复委托和成交
2void TraderXTP::pre_start() {
3    config_ = nlohmann::json::parse(get_config());
4    SPDLOG_INFO("config: {}", get_config());
5    if (not config_.recover_order_trade) {
6        disable_recover();
7    }
8}

has_writer

bool has_writer(uint32_t dest_id) const;

判断是否存在写给dest的writer

get_writer

writer_ptr get_writer(uint32_t dest_id) const;

获取写给dest的writer

范例

 1// xtp在处理成交数据时, 根据是否有writer来决定是否可以使用get_writer
 2if (has_writer(order_state.dest)) {
 3    auto writer = get_writer(order_state.dest);
 4    Trade &trade = writer->open_data<Trade>(now());
 5    from_xtp(trade_info, trade);
 6    trade.trade_id = writer->current_frame_uid();
 7    trade.order_id = kf_order_id;
 8    add_traded_volume(trade_info.order_xtp_id, trade.volume);
 9    SPDLOG_DEBUG("Trade: {}", trade.to_string());
10    writer->close_data();
11} else {
12    Trade trade{};
13    from_xtp(trade_info, trade);
14    trade.trade_id = get_public_writer()->current_frame_uid() xor (time::now_in_nano() & 0xFFFFFFFF);
15    trade.order_id = kf_order_id;
16    add_traded_volume(trade_info.order_xtp_id, trade.volume);
17    SPDLOG_DEBUG("Trade: {}", trade.to_string());
18    try_write_to(trade, order_state.dest);
19}

get_public_writer

writer_ptr &get_public_writer()

dest为0的writer单独使用一个变量存放, 调用该接口直接返回写入dest为0的writer, 效果等价于get_writer(0), 区别是该接口不涉及stl容器访问, 可以在子线程调用

范例

 1// xtp生成系统外订单时, 将数据写入到PUBLIC的journal
 2auto writer = get_public_writer();
 3auto nano = yijinjing::time::now_in_nano();
 4Order &order = writer->open_data<Order>(now());
 5order.order_id = writer->current_frame_uid();
 6from_xtp(order_info, order);
 7order.insert_time = nsec_from_xtp_timestamp(order_info.insert_time);
 8order.update_time = nano;
 9map_kf_to_xtp_order_id_.emplace(uint64_t(order.order_id), order_info.order_xtp_id);
10map_xtp_to_kf_order_id_.emplace(order_info.order_xtp_id, uint64_t(order.order_id));
11SPDLOG_DEBUG("Order: {}", order.to_string());
12writer->close_data();
13try_deal_XTPTradeReport(order_info.order_xtp_id);

open_data<T>

template <typename T> std::enable_if_t<size_fixed_v<T>, T &> open_data(int64_t trigger_time = 0);

该接口是属于writer的接口, 用于生成Order, Trade 等数据使用

close_data

void close_data(int64_t gen_time = time::now_in_nano());

该接口是属于writer的接口, 用于在open_data完成数据写入后, 标记写入完成;

open_data和close_data必须配对使用, 同一个writer中间不可以进行嵌套

范例

 1// xtp生成系统外订单时, 使用open_data和close_data对来生成Order
 2auto writer = get_public_writer();
 3auto nano = yijinjing::time::now_in_nano();
 4Order &order = writer->open_data<Order>(now());
 5order.order_id = writer->current_frame_uid();
 6from_xtp(order_info, order);
 7order.insert_time = nsec_from_xtp_timestamp(order_info.insert_time);
 8order.update_time = nano;
 9map_kf_to_xtp_order_id_.emplace(uint64_t(order.order_id), order_info.order_xtp_id);
10map_xtp_to_kf_order_id_.emplace(order_info.order_xtp_id, uint64_t(order.order_id));
11SPDLOG_DEBUG("Order: {}", order.to_string());
12writer->close_data();
13try_deal_XTPTradeReport(order_info.order_xtp_id);

data<T>

template <typename T> std::enable_if_t<size_fixed_v<T> or std::is_same_v<T, nlohmann::json>, const T &> data() const;

template <typename T> std::enable_if_t<not size_fixed_v<T> and not std::is_same_v<T, nlohmann::json>, const T> data() const;

该接口属于event类, 一般用于在回调接口中, 通过event指针获取数据, 如果数据类型是没有类似于string这样的长度大小固定的, 返回的就是数据的引用;

如果是Register这种有string类型长度未定的, 返回的就是数据的拷贝.

范例

1// insert_order中获取OrderInput
2const OrderInput &input = event->data<OrderInput>();
3
4// cancel_order中获取OrderAction
5const OrderAction &action = event->data<OrderAction>();

order_from_input

inline void order_from_input(const longfist::types::OrderInput &input, longfist::types::Order &order);

根据OrderInput生成Order, 主要用于在insert_order时将数据进行转换.

范例

 1// 在insert_order中根据OrderInput生成OrderInput
 2Order &order = writer->open_data<Order>(event->gen_time());
 3order_from_input(input, order);
 4order.external_order_id = std::to_string(order_xtp_id).c_str();
 5order.insert_time = nano;
 6order.update_time = nano;
 7
 8if (success) {
 9    map_kf_to_xtp_order_id_.emplace(uint64_t(input.order_id), order_xtp_id);
10    map_xtp_to_kf_order_id_.emplace(order_xtp_id, uint64_t(input.order_id));
11} else {
12    auto error_info = api_->GetApiLastError();
13    order.error_id = error_info->error_id;
14    order.error_msg = error_info->error_msg;
15    order.status = OrderStatus::Error;
16}
17
18SPDLOG_DEBUG("Order: {}", order.to_string());
19writer->close_data();
20
21// 在收到成交推送后写入Trade
22if (has_writer(order_state.dest)) {
23    auto writer = get_writer(order_state.dest);
24    Trade &trade = writer->open_data<Trade>(now());
25    from_xtp(trade_info, trade);
26    trade.trade_id = writer->current_frame_uid();
27    trade.order_id = kf_order_id;
28    add_traded_volume(trade_info.order_xtp_id, trade.volume);
29    SPDLOG_DEBUG("Trade: {}", trade.to_string());
30    writer->close_data();
31} else {
32    Trade trade{};
33    from_xtp(trade_info, trade);
34    trade.trade_id = get_public_writer()->current_frame_uid() xor (time::now_in_nano() & 0xFFFFFFFF);
35    trade.order_id = kf_order_id;
36    add_traded_volume(trade_info.order_xtp_id, trade.volume);
37    SPDLOG_DEBUG("Trade: {}", trade.to_string());
38    try_write_to(trade, order_state.dest);
39}

write_to

template <typename DataType> void write_to(const DataType &data, uint32_t dest_id = yijinjing::data::location::PUBLIC);

将数据类型为DataType的数据data, 写入到dest为dest_id的journal中, 其效果等价于以下实现

1auto writer = get_writer(dest);
2auto &data_to_write = writer->open_data<DataType>(now());
3memcpy(&data_to_write, &data, sizeof(DataType));
4writer->close_data();

备注

  1. 调用get_writer和write_to的前提条件是, 存在写给dest_id的writer, 如果不存在, 则会报错崩溃.

  2. 与open_data再close_data的区别是, write_to是把已经存在数据拷贝一份到共享内存里, 如果数据已经存在, 不需要从柜台数据进行转换获得, 两种方式执行效率没有区别, 如果数据本身不存在, 需要从柜台API进行转换获取, 使用open_data和close_data可以直接将数据转换到共享内存里, 少一次数据拷贝.


try_write_to

template <typename DataType> void try_write_to(const DataType &data, uint32_t dest_id = yijinjing::data::location::PUBLIC, const std::function<void()> &callback = []() {});

执行效果类似于write_to, 不同的是两点

  1. 写完以后会执行一个callback函数

  2. 如果不存在dest_id的writer, 则会创建该writer之后再进行写入

 1// xtp收到委托推送时, 不管是td重启恢复委托后查询, 还是正常交易中收到数据, 使用try_write_to更新Order可以在没有dest的writer时可以写入
 2bool TraderXTP::custom_OnOrderEvent(const XTPOrderInfo &order_info, const XTPRI &error_info, uint64_t session_id) {
 3SPDLOG_DEBUG("XTPOrderInfo: {}", to_string(order_info));
 4SPDLOG_DEBUG("session_id: {}, XTPRI: {}", session_id, to_string(error_info));
 5
 6auto order_xtp_id_iter = map_xtp_to_kf_order_id_.find(order_info.order_xtp_id);
 7if (order_xtp_id_iter == map_xtp_to_kf_order_id_.end()) {
 8    SPDLOG_WARN("unrecognized order_xtp_id {}@{}", order_info.order_xtp_id, trading_day_);
 9    return generate_external_order(order_info);
10}
11
12uint64_t kf_order_id = order_xtp_id_iter->second;
13if (not has_order(kf_order_id)) {
14    return generate_external_order(order_info);
15}
16
17auto &order_state = get_order(kf_order_id);
18if (not is_final_status(order_state.data.status) or order_state.data.status == OrderStatus::Lost) {
19    from_xtp_no_price_type(order_info, order_state.data);
20    order_state.data.update_time = yijinjing::time::now_in_nano();
21    if (error_info.error_id != 0) {
22    order_state.data.error_id = error_info.error_id;
23    order_state.data.error_msg = error_info.error_msg;
24    }
25    try_write_to(order_state.data, order_state.dest);
26    SPDLOG_DEBUG("Order: {}", order_state.data.to_string());
27    try_deal_XTPTradeReport(order_info.order_xtp_id);
28}
29return true;
30}

get_thread_writer

writer_ptr &get_thread_writer();

获取线程私有的writer, 作用是给柜台API回调子线程将获取的数据写入到共享内存里, 让主线程从journal读取后再做业务处理, 可以避免容器加锁问题的同时对数据落地, 方便回放

open_custom_data<T>

template <typename T> T &open_custom_data(int32_t msg_type, int64_t trigger_time = 0)

作用于open_data类似, 区别是专门用于写入Kungfu在types.h中定义之外的数据, 同样需要和close_data配对, 同一个writer不能有嵌套

范例

 1// xtp子线程中将成交数据写入到共享内存journal里
 2auto &bf_order_info = get_thread_writer()->open_custom_data<BufferXTPOrderInfo>(kXTPOrderInfoType, now());
 3memcpy(&bf_order_info.order_info, order_info, sizeof(XTPOrderInfo));
 4bf_order_info.session_id = session_id;
 5if (error_info != nullptr) {
 6    memcpy(&bf_order_info.error_info, error_info, sizeof(XTPRI));
 7} else {
 8    memset(&bf_order_info.error_info, 0, sizeof(XTPRI));
 9}
10SPDLOG_DEBUG("BufferXTPOrderInfo: {}", to_string(bf_order_info));
11get_thread_writer()->close_data();

has_order

bool has_order(uint64_t order_id) const;

判断是否存在改order_id的委托

get_order

state<Order> &get_order(uint64_t order_id);

根据order_id获取对应的委托, 如果不存在对应的委托会崩溃, 需要先调用has_order判断是否存在该委托

is_final_status

bool is_final_status(const longfist::enums::OrderStatus &status)

判断委托是否处于最终状态, 一般用于在收到委托推送或者成交推送时, 需要修改Order的成交数量和状态信息, 可能出现乱序推送的情况, 导致先处理了最终状态的推送, 再收到中间状态的推送, 当Order已经处于最终状态时, 不再修改Order.

范例

 1// xtp收到委托推送, 根据映射获取的order_id判断是否存在该委托, 再获取对应的Order
 2bool TraderXTP::custom_OnOrderEvent(const XTPOrderInfo &order_info, const XTPRI &error_info, uint64_t session_id) {
 3    SPDLOG_DEBUG("XTPOrderInfo: {}", to_string(order_info));
 4    SPDLOG_DEBUG("session_id: {}, XTPRI: {}", session_id, to_string(error_info));
 5
 6    auto order_xtp_id_iter = map_xtp_to_kf_order_id_.find(order_info.order_xtp_id);
 7    if (order_xtp_id_iter == map_xtp_to_kf_order_id_.end()) {
 8        SPDLOG_WARN("unrecognized order_xtp_id {}@{}", order_info.order_xtp_id, trading_day_);
 9        return generate_external_order(order_info);
10    }
11
12    uint64_t kf_order_id = order_xtp_id_iter->second;
13    if (not has_order(kf_order_id)) {
14        return generate_external_order(order_info);
15    }
16
17    auto &order_state = get_order(kf_order_id);
18    if (not is_final_status(order_state.data.status) or order_state.data.status == OrderStatus::Lost) {
19        from_xtp_no_price_type(order_info, order_state.data);
20        order_state.data.update_time = yijinjing::time::now_in_nano();
21        if (error_info.error_id != 0) {
22        order_state.data.error_id = error_info.error_id;
23        order_state.data.error_msg = error_info.error_msg;
24        }
25        try_write_to(order_state.data, order_state.dest);
26        SPDLOG_DEBUG("Order: {}", order_state.data.to_string());
27        try_deal_XTPTradeReport(order_info.order_xtp_id);
28    }
29    return true;
30}

has_order_action

bool has_order_action(uint64_t action_id) const;

判断是否存在该action_id的撤单操作

get_order_action

state<OrderAction> &get_order_action(uint64_t action_id);

根据action_id获取对应的撤单操作, 如果不存在对应的撤单操作会崩溃, 需要先调用has_order_action判断是否存在该委托

范例

 1// xtp撤单报错时, 根据撤单时构建的映射关系获取action_id后, 根据action_id判断是否存在对应的撤单操作, 获取撤单操作后生成OrderActionError
 2bool TraderXTP::custom_OnCancelOrderError(const XTPOrderCancelInfo &cancel_info, const XTPRI &error_info,
 3                                        uint64_t session_id) {
 4SPDLOG_DEBUG("XTPOrderCancelInfo: {}", to_string(cancel_info));
 5SPDLOG_DEBUG("session_id: {}, XTPRI: {}", session_id, to_string(error_info));
 6
 7uint64_t action_id = get_action_id(cancel_info.order_xtp_id);
 8if (not has_order_action(action_id)) {
 9    SPDLOG_WARN("has not related OrderAction of {}:{}", cancel_info.order_xtp_id, action_id);
10    return false;
11}
12
13auto action_state = get_order_action(action_id);
14auto order_id = action_state.data.order_id;
15if (not has_order(order_id)) {
16    SPDLOG_WARN("order_id not in orders_ {}", order_id);
17    return false;
18}
19
20auto order_state = get_order(order_id);
21if (has_writer(order_state.dest)) {
22    OrderActionError &error = get_writer(order_state.dest)->open_data<OrderActionError>(now());
23    error.order_id = order_state.data.order_id; // 订单ID
24    std::string str_external_order_id = std::to_string(cancel_info.order_xtp_id);
25    error.external_order_id = str_external_order_id.c_str();
26    error.order_action_id = action_id;       // 订单操作ID,
27    error.error_id = error_info.error_id;    // 错误ID
28    error.error_msg = error_info.error_msg;  // 错误信息
29    error.insert_time = time::now_in_nano(); // 写入时间
30    SPDLOG_DEBUG("OrderActionError: {}", error.to_string());
31    get_writer(order_state.dest)->close_data();
32} else {
33    OrderActionError error{};
34    error.order_id = order_state.data.order_id; // 订单ID
35    std::string str_external_order_id = std::to_string(cancel_info.order_xtp_id);
36    error.external_order_id = str_external_order_id.c_str();
37    error.order_action_id = action_id;       // 订单操作ID,
38    error.error_id = error_info.error_id;    // 错误ID
39    error.error_msg = error_info.error_msg;  // 错误信息
40    error.insert_time = time::now_in_nano(); // 写入时间
41    SPDLOG_DEBUG("OrderActionError: {}", error.to_string());
42    try_write_to(error, order_state.dest);
43}
44return true;
45}

has_order_trigger

bool has_order_trigger(uint64_t trigger_id) const;

判断是否存在该trigger_id的预埋单

get_order_trigger

state<OrderTrigger> &get_order_trigger(uint64_t trigger_id);

根据trigger_id获取对应的预埋单, 如果不存在对应的预埋单会崩溃, 需要先调用has_order_trigger判断是否存在该预埋单撤

范例

 1// ctp预埋单委托下单请求响应, 根据响应信息修改预埋单状态
 2bool TraderCTP::custom_OnRspParkedOrderAction(const CThostFtdcParkedOrderActionField &ParkedOrderAction,
 3                                            const CThostFtdcRspInfoField &RspInfo, int nRequestID, bool bIsLast) {
 4SPDLOG_DEBUG("CThostFtdcParkedOrderActionField: {}", to_string(ParkedOrderAction));
 5SPDLOG_DEBUG("CThostFtdcRspInfoField: {}", to_string(RspInfo));
 6SPDLOG_DEBUG("nRequestID: {}, bIsLast: {}", nRequestID, bIsLast);
 7
 8auto trigger_id_iter = map_request_id_to_kf_action_id_.find(nRequestID);
 9if (trigger_id_iter == map_request_id_to_kf_action_id_.end()) {
10    SPDLOG_ERROR("CANNOT FIND trigger_id of {} in map_request_id_to_kf_action_id_", nRequestID);
11    return false;
12}
13
14auto trigger_id = trigger_id_iter->second;
15if (not has_order_trigger(trigger_id)) {
16    SPDLOG_ERROR("CANNOT FIND tigger_id {} in triggers_", trigger_id);
17    return false;
18}
19
20auto &trigger_state = get_order_trigger(trigger_id);
21map_trigger_id_to_ParkedOrderID_.insert_or_assign(
22    trigger_id, std::pair<std::string, bool>{ParkedOrderAction.ParkedOrderActionID, true});
23map_ParkedOrderId_to_trigger_id_.insert_or_assign(ParkedOrderAction.ParkedOrderActionID, trigger_id);
24trigger_state.data.status = parked_status_to_trigger_status(ParkedOrderAction.Status);
25strncpy(trigger_state.data.external_trigger_id, ParkedOrderAction.ParkedOrderActionID,
26        strlen(ParkedOrderAction.ParkedOrderActionID));
27trigger_state.data.error_id = RspInfo.ErrorID;
28const std::string msg = gbk2utf8(RspInfo.ErrorMsg);
29strncpy(trigger_state.data.error_msg, msg.c_str(), msg.length());
30trigger_state.data.update_time = time::now_in_nano();
31
32if (RspInfo.ErrorID != 0) {
33    trigger_state.data.status = OrderStatus::Error;
34    SPDLOG_ERROR("failed to ReqParkedOrderAction, ErrorId: {} ErrorMsg: {}, parked_order_action: {}", RspInfo.ErrorID,
35                gbk2utf8(RspInfo.ErrorMsg), to_string(ParkedOrderAction));
36}
37
38try_write_to(trigger_state.data, trigger_state.dest);
39SPDLOG_DEBUG("OrderTrigger: {}", trigger_state.data.to_string());
40
41return true;
42}

has_order_trigger_action

bool has_order_trigger_action(uint64_t action_id) const;

判断是否存在该action_id的预埋单操作

get_order_trigger

state<OrderTriggerAction> &get_order_trigger_action(uint64_t action_id);

根据action_id获取对应的预埋单撤单操作, 如果不存在对应的预埋单撤单操作会崩溃, 需要先调用has_order_trigger_action判断是否存在该预埋单撤单操


now

int64_t now() const;

触发当前回调函数的journal数据的gen_time;

例如, 当前处于insert_order回调函数, now()的值为OrderInput数据的gen_time; 当前处于cancel_order回调函数, now()的值为OrderAction数据的gen_time;

不要在子线程中调用使用该函数, 在子线程中该值无意义.


now_in_nano

int64_t time::now_in_nano();

系统的当前实际时间, 与当前处于哪个回调函数无关, 主线程和子线程都用都表示的是当前的实际时间.


add_timer

int32_t add_timer(int64_t nanotime, const std::function<void(const event_ptr &)> &callback);

添加定时器, 当实际时间到nanotime时, 执行一次callback函数.

返回值是timer_id, 可以根据该值取消该定时器.

范例

 1// ctp查询资金, 如果失败添加定时器2秒后再查一次, 直到查询成功
 2void TraderCTP::add_timer_req_account(int64_t nano) {
 3add_timer(nano, [&](const auto &event) {
 4    CThostFtdcQryTradingAccountField req = {};
 5    strcpy(req.BrokerID, config_.broker_id.c_str());
 6    strcpy(req.InvestorID, config_.account_id.c_str());
 7    SPDLOG_TRACE("CThostFtdcQryTradingAccountField: {}", to_string(req));
 8    // std::this_thread::sleep_for(std::chrono::seconds(1));
 9    int rtn = api_->ReqQryTradingAccount(&req, get_request_id());
10    SPDLOG_TRACE("ReqQryTradingAccount rtn {}", rtn);
11    if (rtn != 0) {
12    add_timer_req_account(time::now_in_nano() + 2 * time_unit::NANOSECONDS_PER_SECOND);
13    }
14});
15}

add_time_interval

int32_t add_time_interval(int64_t nanotime, const std::function<void(const event_ptr &)> &callback);

添加间隔定时器, 从现在开始, 每隔nanotime时间, 执行一次callback函数.

返回值是timer_id, 可以根据该值取消该定时器.

范例

1// ledger模块每隔一分钟, 通知所有TD查询资金和持仓
2if (sync_asset_) {
3    add_time_interval(time_unit::NANOSECONDS_PER_MINUTE,
4                    [&](const event_ptr &e) { request_asset_sync(e->gen_time()); });
5}
6if (sync_position_) {
7    add_time_interval(time_unit::NANOSECONDS_PER_MINUTE,
8                    [&](const event_ptr &e) { request_position_sync(e->gen_time()); });
9}

clear_timer

void clear_timer(int32_t timer_id);

根据timer_id取消由add_timer和and_time_interval生成的定时器.


request_deregister

void request_deregister()

申请停止进程, 调用该函数后, 在执行完当前回调函数后, 进程会停止.


推送处理

成交推处理

委托成交时, 会触发柜台API的成交推送函数, 需要根据推送的成交数据中的 柜台API委托号 找到Kungfu对应的Order, 然后生成Trade, 并且修改Order的状态.

针对于异步柜台, 可能会发生成交推送函数早于委托响应函数触发的情况, 此时还没有获得*柜台API委托号*, 无法找到Kungfu对应的Order, 此时需要先将委托暂存, 当收到委托响应后, 再将成交数据取出来处理.

重启TD时, 查询并恢复今日的成交数据, 可能会导致同一笔成交处理两次, 所以针对于已处理的成交数据做标记, 再次收到已经处理过的成交数据时, 不再处理.

范例

  1// ctp的处理实现
  2
  3std::unordered_map<std::string, std::unordered_set<std::string>> map_ExchangeID_OrderSysID_to_TradeIDs_{}; // <交易所:订单号, set<已经处理过的成交编号>>
  4std::unordered_map<std::string, std::vector<CThostFtdcTradeField>> map_ExchangeID_OrderSysID_to_CThostFtdcTradeFields_{}; // <交易所:订单号, vector<先于委托响应回来的成交回报>>
  5std::unordered_map<std::string, uint64_t> map_ExchangeID_OrderSysID_to_kf_order_id_{}; // <交易所:订单号, kf_order_id>
  6std::unordered_map<uint64_t, std::string> map_kf_order_id_to_OrderSysID_{}; // <kf_order_id, 订单号> 撤单用
  7std::unordered_map<uint64_t, std::string> map_kf_order_id_to_OrderRef_{}; // <kf_order_id, OrderRef> 撤未提交到交易所的单用
  8
  9// 判断成交是否已经处理
 10bool TraderCTP::has_dealt_trade(const CThostFtdcTradeField &ctp_trade) {
 11    auto &dealt_TradeIDs = map_ExchangeID_OrderSysID_to_TradeIDs_
 12                                .try_emplace(make_ExchangeID_OrderSysID(ctp_trade.ExchangeID, ctp_trade.OrderSysID))
 13                                .first->second;
 14    return dealt_TradeIDs.find(ctp_trade.TradeID) != dealt_TradeIDs.end();
 15}
 16
 17// 记录已经处理过的成交
 18void TraderCTP::add_TradeID(const CThostFtdcTradeField &ctp_trade) {
 19    map_ExchangeID_OrderSysID_to_TradeIDs_
 20        .try_emplace(make_ExchangeID_OrderSysID(ctp_trade.ExchangeID, ctp_trade.OrderSysID))
 21        .first->second.insert(ctp_trade.TradeID);
 22}
 23
 24// 成交推送处理
 25bool TraderCTP::custom_OnRtnTrade(const CThostFtdcTradeField &ctp_trade) {
 26    SPDLOG_DEBUG("CThostFtdcTradeField: {}", to_string(ctp_trade));
 27    if (ctp_trade.Price < 1 || ctp_trade.Price == 2.2250738585072014e-308 || ctp_trade.Price == 1.7976931348623158e+308 ||
 28        ctp_trade.Volume == -2147483648 || ctp_trade.Volume == 2147483647) {
 29        SPDLOG_ERROR("Order Price too low, Not True ctp_trade.Price {}, *ctp_trade {}", ctp_trade.Price,
 30                    to_string(ctp_trade));
 31        return false;
 32    }
 33
 34    const std::string &str_ExchangeID_OrderSysID = make_ExchangeID_OrderSysID(ctp_trade.ExchangeID, ctp_trade.OrderSysID);
 35    auto ExchangeID_OrderSysID_iter = map_ExchangeID_OrderSysID_to_kf_order_id_.find(str_ExchangeID_OrderSysID);
 36    if (ExchangeID_OrderSysID_iter == map_ExchangeID_OrderSysID_to_kf_order_id_.end()) {
 37       SPDLOG_WARN("CANNOT FIND {} in map_ExchangeID_OrderSysID_to_kf_order_id_, STORE CThostFtdcTradeField IN "
 38                    "map_ExchangeID_OrderSysID_to_CThostFtdcTradeFields_",
 39                    str_ExchangeID_OrderSysID);
 40        map_ExchangeID_OrderSysID_to_CThostFtdcTradeFields_.try_emplace(str_ExchangeID_OrderSysID)
 41            .first->second.push_back(ctp_trade); // 暂存先于委托响应收到的成交推送
 42        return false;
 43    }
 44    deal_trade(ctp_trade);
 45    return true;
 46}
 47
 48// 委托响应和推送处理
 49bool TraderCTP::custom_OnRtnOrder(const CThostFtdcOrderField &ctp_order) {
 50    SPDLOG_DEBUG("CThostFtdcOrderField: {}", to_string(ctp_order));
 51
 52    const std::string str_ExchangeID_OrderLocalID =
 53        make_ExchangeID_OrderSysID(ctp_order.ExchangeID, ctp_order.OrderLocalID);
 54    const std::string str_ExchangeID_OrderSysID = make_ExchangeID_OrderSysID(ctp_order.ExchangeID, ctp_order.OrderSysID);
 55    uint64_t orderRef_key = get_orderRef_key(ctp_order.FrontID, ctp_order.SessionID, ctp_order.OrderRef);
 56    auto OrderRefKey_iter = map_OrderRefKey_to_kf_order_id_.find(orderRef_key);
 57    auto ExchangeID_OrderSysID_iter = map_ExchangeID_OrderSysID_to_kf_order_id_.find(str_ExchangeID_OrderSysID);
 58    auto ExchangeID_OrderLocalID_iter = map_ExchangeID_OrderSysID_to_kf_order_id_.find(str_ExchangeID_OrderLocalID);
 59
 60    // 系统外订单信息
 61    if (OrderRefKey_iter == map_OrderRefKey_to_kf_order_id_.end() and
 62        ExchangeID_OrderSysID_iter == map_ExchangeID_OrderSysID_to_kf_order_id_.end() and
 63        ExchangeID_OrderLocalID_iter == map_ExchangeID_OrderSysID_to_kf_order_id_.end()) {
 64        SPDLOG_WARN("external Order with ExchangeID: {}, OrderSysID: {}, OrderLocalID: {}", ctp_order.ExchangeID,
 65                    ctp_order.OrderSysID, ctp_order.OrderLocalID);
 66        return generate_external_order(ctp_order);
 67    }
 68
 69    uint64_t kf_order_id = 0;
 70    if (strlen(ctp_order.OrderSysID) != 0 and
 71        ExchangeID_OrderSysID_iter != map_ExchangeID_OrderSysID_to_kf_order_id_.end()) {
 72        kf_order_id = ExchangeID_OrderSysID_iter->second;
 73    } else if (OrderRefKey_iter != map_OrderRefKey_to_kf_order_id_.end()) {
 74        kf_order_id = OrderRefKey_iter->second;
 75    } else if (ExchangeID_OrderLocalID_iter != map_ExchangeID_OrderSysID_to_kf_order_id_.end()) {
 76        kf_order_id = ExchangeID_OrderLocalID_iter->second;
 77    } else {
 78        SPDLOG_ERROR("invalidate CThostFtdcOrderField: {}", to_string(ctp_order));
 79        return false;
 80    }
 81
 82    if (strlen(ctp_order.OrderSysID) != 0) {
 83        map_kf_order_id_to_OrderSysID_.insert_or_assign(kf_order_id, ctp_order.OrderSysID);
 84        map_ExchangeID_OrderSysID_to_kf_order_id_.insert_or_assign(str_ExchangeID_OrderSysID, kf_order_id);
 85    }
 86
 87    if (not has_order(kf_order_id)) {
 88        SPDLOG_WARN("no order_id {} in orders_", kf_order_id);
 89        return generate_external_order(ctp_order);
 90    }
 91
 92    auto &order_state = get_order(kf_order_id);
 93
 94    if (is_final_status(order_state.data.status) and order_state.data.status != longfist::enums::OrderStatus::Lost and
 95        order_state.data.status != longfist::enums::OrderStatus::Cancelling) {
 96        return true;
 97    }
 98
 99    from_ctp(ctp_order, order_state.data);
100    order_state.data.update_time = time::now_in_nano();
101    if (has_writer(order_state.dest)) {
102        write_to(order_state.data, order_state.dest);
103    } else {
104        ++try_write_to_order_count;
105        try_write_to(order_state.data, order_state.dest, [&]() {
106            --try_write_to_order_count;
107            try_ready();
108        });
109    }
110    try_deal_trade(str_ExchangeID_OrderSysID); // 处理暂存的委托
111    return true;
112}
113
114// 处理暂存的委托
115void TraderCTP::try_deal_trade(const std::string &str_ExchangeID_OrderSysID) {
116    SPDLOG_DEBUG("ExchangeID_OrderSysID: {}", str_ExchangeID_OrderSysID);
117    auto &ctp_trades =
118        map_ExchangeID_OrderSysID_to_CThostFtdcTradeFields_.try_emplace(str_ExchangeID_OrderSysID).first->second;
119    for (const CThostFtdcTradeField &ctp_trade : ctp_trades) {
120        deal_trade(ctp_trade);
121    }
122    ctp_trades.clear();
123}
124
125// 生成Trade
126void TraderCTP::deal_trade(const CThostFtdcTradeField &ctp_trade) {
127    SPDLOG_DEBUG("CThostFtdcTradeField: {}", to_string(ctp_trade));
128    const std::string &str_ExchangeID_OrderSysID = make_ExchangeID_OrderSysID(ctp_trade.ExchangeID, ctp_trade.OrderSysID);
129    auto ExchangeID_OrderSysID_iter = map_ExchangeID_OrderSysID_to_kf_order_id_.find(str_ExchangeID_OrderSysID);
130    if (ExchangeID_OrderSysID_iter == map_ExchangeID_OrderSysID_to_kf_order_id_.end()) {
131        SPDLOG_ERROR("CANNOT FIND {} in map_ExchangeID_OrderSysID_to_kf_order_id_", str_ExchangeID_OrderSysID);
132        return;
133    }
134    uint64_t kf_order_id = ExchangeID_OrderSysID_iter->second;
135    if (not has_order(kf_order_id)) {
136        SPDLOG_ERROR("no order_id {} in orders_", kf_order_id);
137        return;
138    }
139
140    // 判断委托是否已经处理过
141    if (has_dealt_trade(ctp_trade)) {
142        SPDLOG_WARN("trade: [{}:{}] HAS DEALT, DO NOT DEAL SECOND TIME", str_ExchangeID_OrderSysID, ctp_trade.TradeID);
143        return;
144    }
145    add_TradeID(ctp_trade); // 记录已处理委托
146
147    auto &order_state = get_order(kf_order_id);
148    // 生成Trade
149    if (has_writer(order_state.dest)) {
150        auto writer = get_writer(order_state.dest);
151        Trade &trade = writer->open_data<Trade>(0);
152        from_ctp(ctp_trade, trade);
153        uint64_t trade_id = writer->current_frame_uid();
154        trade.trade_id = trade_id;
155        trade.order_id = order_state.data.order_id;
156        SPDLOG_DEBUG("Trade: {}", trade.to_string());
157        writer->close_data();
158    } else {
159        Trade trade{};
160        from_ctp(ctp_trade, trade);
161        uint64_t trade_id = get_public_writer()->current_frame_uid() xor (time::now_in_nano() & 0x0000FFFF);
162        trade.trade_id = trade_id;
163        trade.order_id = order_state.data.order_id;
164        SPDLOG_DEBUG("Trade: {}", trade.to_string());
165        ++try_write_to_trade_count;
166        try_write_to(trade, order_state.dest, [&]() {
167            --try_write_to_trade_count;
168            try_ready();
169        });
170    }
171}

系统外委托和成交推送处理

当同一个交易账户在多个客户端同时登陆时, Kungfu客户端会收到其他客户端提交的委托的推送信息, 可以根据个人需求选择是否要处理系统外的委托和成交.

范例

  1// ctp处理实现
  2
  3// 委托响应和推送处理
  4bool TraderCTP::custom_OnRtnOrder(const CThostFtdcOrderField &ctp_order) {
  5    SPDLOG_DEBUG("CThostFtdcOrderField: {}", to_string(ctp_order));
  6
  7    const std::string str_ExchangeID_OrderLocalID =
  8        make_ExchangeID_OrderSysID(ctp_order.ExchangeID, ctp_order.OrderLocalID);
  9    const std::string str_ExchangeID_OrderSysID = make_ExchangeID_OrderSysID(ctp_order.ExchangeID, ctp_order.OrderSysID);
 10    uint64_t orderRef_key = get_orderRef_key(ctp_order.FrontID, ctp_order.SessionID, ctp_order.OrderRef);
 11    auto OrderRefKey_iter = map_OrderRefKey_to_kf_order_id_.find(orderRef_key);
 12    auto ExchangeID_OrderSysID_iter = map_ExchangeID_OrderSysID_to_kf_order_id_.find(str_ExchangeID_OrderSysID);
 13    auto ExchangeID_OrderLocalID_iter = map_ExchangeID_OrderSysID_to_kf_order_id_.find(str_ExchangeID_OrderLocalID);
 14
 15    // 系统外订单信息
 16    if (OrderRefKey_iter == map_OrderRefKey_to_kf_order_id_.end() and
 17        ExchangeID_OrderSysID_iter == map_ExchangeID_OrderSysID_to_kf_order_id_.end() and
 18        ExchangeID_OrderLocalID_iter == map_ExchangeID_OrderSysID_to_kf_order_id_.end()) {
 19        SPDLOG_WARN("external Order with ExchangeID: {}, OrderSysID: {}, OrderLocalID: {}", ctp_order.ExchangeID,
 20                    ctp_order.OrderSysID, ctp_order.OrderLocalID);
 21        return generate_external_order(ctp_order); // 处理系统外委托
 22    }
 23
 24    uint64_t kf_order_id = 0;
 25    if (strlen(ctp_order.OrderSysID) != 0 and
 26        ExchangeID_OrderSysID_iter != map_ExchangeID_OrderSysID_to_kf_order_id_.end()) {
 27        kf_order_id = ExchangeID_OrderSysID_iter->second;
 28    } else if (OrderRefKey_iter != map_OrderRefKey_to_kf_order_id_.end()) {
 29        kf_order_id = OrderRefKey_iter->second;
 30    } else if (ExchangeID_OrderLocalID_iter != map_ExchangeID_OrderSysID_to_kf_order_id_.end()) {
 31        kf_order_id = ExchangeID_OrderLocalID_iter->second;
 32    } else {
 33        SPDLOG_ERROR("invalidate CThostFtdcOrderField: {}", to_string(ctp_order));
 34        return false;
 35    }
 36
 37    if (strlen(ctp_order.OrderSysID) != 0) {
 38        map_kf_order_id_to_OrderSysID_.insert_or_assign(kf_order_id, ctp_order.OrderSysID);
 39        map_ExchangeID_OrderSysID_to_kf_order_id_.insert_or_assign(str_ExchangeID_OrderSysID, kf_order_id);
 40    }
 41
 42    if (not has_order(kf_order_id)) {
 43        SPDLOG_WARN("no order_id {} in orders_", kf_order_id);
 44        return generate_external_order(ctp_order);
 45    }
 46
 47    auto &order_state = get_order(kf_order_id);
 48
 49    if (is_final_status(order_state.data.status) and order_state.data.status != longfist::enums::OrderStatus::Lost and
 50        order_state.data.status != longfist::enums::OrderStatus::Cancelling) {
 51        return true;
 52    }
 53
 54    from_ctp(ctp_order, order_state.data);
 55    order_state.data.update_time = time::now_in_nano();
 56    if (has_writer(order_state.dest)) {
 57        write_to(order_state.data, order_state.dest);
 58    } else {
 59        ++try_write_to_order_count;
 60        try_write_to(order_state.data, order_state.dest, [&]() {
 61        --try_write_to_order_count;
 62        try_ready();
 63        });
 64    }
 65    try_deal_trade(str_ExchangeID_OrderSysID); // 处理暂存的委托
 66    return true;
 67}
 68
 69// 根据系统外的委托推送信息生成Kunfu的Order
 70bool TraderCTP::generate_external_order(const CThostFtdcOrderField &ctp_order) {
 71    SPDLOG_DEBUG("CThostFtdcOrderField: {}", to_string(ctp_order));
 72
 73    if (not config_.sync_external_order) {
 74        return false;
 75    }
 76
 77    auto nano = time::now_in_nano();
 78    auto writer_order = get_public_writer();
 79    Order &order = writer_order->open_data<Order>(now());
 80    order.order_id = writer_order->current_frame_uid();
 81    from_ctp(ctp_order, order);
 82    order.insert_time = nsec_from_ctp_time(ctp_order.InsertDate, ctp_order.InsertTime);
 83    if (order.insert_time > nano) {
 84        order.insert_time -= time_unit::NANOSECONDS_PER_DAY;
 85    }
 86    order.update_time = nano;
 87    writer_order->close_data();
 88    SPDLOG_DEBUG("Order: {}", order.to_string());
 89
 90    uint64_t orderRef_key = get_orderRef_key(ctp_order.FrontID, ctp_order.SessionID, ctp_order.OrderRef);
 91    map_OrderRefKey_to_kf_order_id_.insert_or_assign(orderRef_key, uint64_t(order.order_id));
 92
 93    const std::string str_ExchangeID_OrderSysID =
 94        strlen(ctp_order.OrderSysID) == 0 ? make_ExchangeID_OrderSysID(ctp_order.ExchangeID, ctp_order.OrderLocalID)
 95                                            : make_ExchangeID_OrderSysID(ctp_order.ExchangeID, ctp_order.OrderSysID);
 96    if (strlen(ctp_order.OrderSysID) != 0) {
 97        map_ExchangeID_OrderSysID_to_kf_order_id_.insert_or_assign(str_ExchangeID_OrderSysID, uint64_t(order.order_id));
 98        map_kf_order_id_to_OrderSysID_.insert_or_assign(uint64_t(order.order_id), ctp_order.OrderSysID);
 99    }
100    try_deal_trade(str_ExchangeID_OrderSysID); // 处理暂存的成交
101    return true;
102}

备注

如果系统外的委托推送先于成交推送到达, 处理完系统外委托后, 成交推送就可以当做系统内正常顺序的成交推送处理.

如果系统外的成交推送先于委托推送到达, 成交信息会暂存, 处理完系统外委托后, 会和处理系统内暂存的成交信息一样处理.


行情源对接

行情接口基类

 1class MarketData : public BrokerService {
 2public:
 3  explicit MarketData(BrokerVendor &vendor) : BrokerService(vendor) {}
 4
 5  // 策略或前端调用subscribe订阅指定标的行情时会触发该回调, 用于根据交易所和标的号订阅行情
 6  virtual bool subscribe(const std::vector<longfist::types::InstrumentKey> &instrument_keys);
 7
 8  // subscribe_custom默认实现会调用该回调, 用于订阅行情源全市场行情
 9  virtual bool subscribe_all();
10
11  // 策略调用subscribe_all会触发该回调, 用于根据自定义的行情类型, 标的类型, 逐笔类型进行全市场订阅; 默认实现为调用MarketData::subscribe_all
12  virtual bool subscribe_custom(const longfist::types::CustomSubscribe &custom_sub);
13
14  // 保留接口
15  virtual bool unsubscribe(const std::vector<longfist::types::InstrumentKey> &instrument_keys);
16
17  // 收到master广播的Band数据会触发该回调, 用于订阅band信道
18  virtual void on_band(const event_ptr &event);
19
20protected:
21  [[nodiscard]] bool has_instrument(const std::string &instrument_id) const;
22
23  [[nodiscard]] const longfist::types::Instrument &get_instrument(const std::string &instrument_id) const;
24
25  void update_instrument(longfist::types::Instrument instrument);
26
27  void try_subscribe();
28
29  void add_instrument_key(const longfist::types::InstrumentKey &key);
30
31  std::unordered_map<std::string, longfist::types::Instrument> instruments_ = {};
32  std::vector<longfist::types::InstrumentKey> instruments_to_subscribe_ = {};
33};

主要回调接口

subscribe

virtual bool subscribe(const std::vector<InstrumentKey> &instrument_keys);

用于根据交易所和标的号订阅行情

策略或前端订阅指定标的时, 订阅的标的会暂存在 std::vector<InstrumentKey> instruments_to_subscribe_, 每隔一秒会检查 instruments_to_subscribe_.empty(), 如果非空则调用 subscribe(instruments_to_subscribe_), 结束后执行 instruments_to_subscribe_.clear() 清空列表.

参数

参数

类型

说明

instrument_keys

const std::vector<InstrumentKey> &

订阅的标的列表

返回值

类型

说明

bool

订阅成功返回true, 订阅失败返回false

范例

 1// ctp的subscribe实现
 2bool MarketDataCTP::subscribe(const std::vector<InstrumentKey> &instruments) {
 3  auto length = instruments.size();
 4  auto targets = new char *[length];
 5  for (int i = 0; i < length; i++) {
 6    targets[i] = const_cast<char *>(instruments[i].instrument_id.value);
 7  }
 8  auto rtn = api_->SubscribeMarketData(targets, length);
 9  delete[] targets;
10  return rtn == 0;
11}

subscribe_all

virtual bool subscribe_all();

用于订阅全市场行情

参数

参数

类型

说明

返回值

类型

说明

bool

订阅成功返回true, 订阅失败返回false

范例

1// xtp的subscribe_all实现
2bool subscribe_all() override {
3  auto result = api_->SubscribeAllMarketData() && api_->SubscribeAllTickByTick();
4  SPDLOG_INFO("subscribe all, rtn code {}", result);
5  return result;
6}

subscribe_custom

virtual bool subscribe_custom(const longfist::types::CustomSubscribe &custom_sub);

用于根据自定义的行情类型, 标的类型, 逐笔类型进行全市场订阅; 默认实现为调用 MarketData::subscribe_all

参数

参数

类型

说明

custom_sub

CustomSubscribe

自定义交易市场,

返回值

类型

说明

bool

订阅成功返回true, 订阅失败返回false


unsubscribe

virtual bool unsubscribe(const std::vector<longfist::types::InstrumentKey> &instrument_keys);

最初设计时是为了方面取消订阅指定的标的行情, 但是在多策略场景下, 一个策略调用取消订阅了指定标的后, 会导致订阅了同样标的的其他策略无法收到行情, 故而不建议使用.


on_band

virtual void on_band(const event_ptr &event);

当有一个进程调用request_band创建一个写入信道时, master会广播这条信道的信息, 所有的进程都会收到一个Band信息, 可以在on_band中选择是否要订阅这条信道.

一般场景下不会用到该接口.


主要主调接口

request_band

uint32_t request_band(const std::string &band_name, uint64_t page_size = 0);

申请一个band信道, 返回结果为band对应的location_uid, page_size为journal每一页的大小.

范例

1// xtp行情在pre_start中申请两个大小为256MB的信道, 用于写入逐笔委托和逐笔成交行情
2void MarketDataXTP::pre_start() {
3    entrust_band_uid_ = request_band("market-data-band-entrust", 256);
4    transaction_band_uid_ = request_band("market-data-band-transaction", 256);
5}

update_broker_state

void update_broker_state(BrokerState state);

修改TD的状态

范例

 1// xtp在行情登录成功后将MD状态设置为Ready
 2void MarketDataXTP::on_start() {
 3    MDConfiguration config = nlohmann::json::parse(get_config());
 4    if (config.client_id < 1 or config.client_id > 99) {
 5        SPDLOG_ERROR("client_id must between 1 and 99");
 6    }
 7    auto md_ip = config.md_ip.c_str();
 8    auto account_id = config.account_id.c_str();
 9    auto password = config.password.c_str();
10    auto protocol_type = get_xtp_protocol_type(config.protocol);
11    std::string runtime_folder = get_runtime_folder();
12    SPDLOG_INFO("Connecting XTP MD for {} at {}://{}:{}", account_id, config.protocol, md_ip, config.md_port);
13    api_ =
14        XTP::API::QuoteApi::CreateQuoteApi(config.client_id, runtime_folder.c_str(), XTP_LOG_LEVEL::XTP_LOG_LEVEL_INFO);
15    if (config.protocol == "udp") {
16        api_->SetUDPBufferSize(config.buffer_size);
17    }
18    api_->RegisterSpi(this);
19    if (api_->Login(md_ip, config.md_port, account_id, password, protocol_type) == 0) {
20        update_broker_state(BrokerState::LoggedIn);
21        update_broker_state(BrokerState::Ready);
22        SPDLOG_INFO("login success! (account_id) {}", config.account_id);
23        if (config.query_instruments and not check_if_stored_instruments(time::strfnow("%Y%m%d"))) {
24        api_->QueryAllTickers(XTP_EXCHANGE_SH);
25        api_->QueryAllTickers(XTP_EXCHANGE_SZ);
26        api_->QueryAllTickersFullInfo(XTP_EXCHANGE_SH);
27        api_->QueryAllTickersFullInfo(XTP_EXCHANGE_SZ);
28        }
29    } else {
30        update_broker_state(BrokerState::LoginFailed);
31        SPDLOG_ERROR("failed to login, [{}] {}", api_->GetApiLastError()->error_id, api_->GetApiLastError()->error_msg);
32    }
33}

has_band_writer

bool has_band_writer(uint32_t dest_id) const;

判断是否有写入到band的writer

get_band_writer

yijinjing::journal::writer_ptr get_band_writer(uint32_t dest_id) const;

获取写入到Bnad的writer

范例

 1// 收到逐笔推送时, 判断是否存在写入到Band的writer, 然后获取对应的writer赋值给对应变量, 下次收到行情推送时可以直接使用writer, 减少对map的访问
 2void MarketDataXTP::OnTickByTick(XTPTBT *tbt_data) {
 3    if (tbt_data->type == XTP_TBT_ENTRUST) {
 4        if (tbt_data->entrust.ord_type == 'D') {
 5        if (not transaction_band_writer_) {
 6            if (not has_band_writer(transaction_band_uid_)) {
 7            SPDLOG_INFO("band writer for market-data-band-transaction not ready");
 8            return;
 9            }
10            transaction_band_writer_ = get_band_writer(transaction_band_uid_);
11        }
12        Transaction &transaction = transaction_band_writer_->open_data<Transaction>(0);
13        from_xtp(*tbt_data, transaction);
14        transaction_band_writer_->close_data();
15        } else {
16        if (not entrust_band_writer_) {
17            if (not has_band_writer(entrust_band_uid_)) {
18            SPDLOG_INFO("band writer for market-data-band-entrust not ready");
19            return;
20            }
21            entrust_band_writer_ = get_band_writer(entrust_band_uid_);
22        }
23        Entrust &entrust = entrust_band_writer_->open_data<Entrust>(0);
24        from_xtp(*tbt_data, entrust);
25        entrust_band_writer_->close_data();
26        }
27    } else if (tbt_data->type == XTP_TBT_TRADE) {
28        if (not transaction_band_writer_) {
29        if (not has_band_writer(transaction_band_uid_)) {
30            SPDLOG_INFO("band writer for market-data-band-transaction not ready");
31            return;
32        }
33        transaction_band_writer_ = get_band_writer(transaction_band_uid_);
34        }
35        Transaction &transaction = transaction_band_writer_->open_data<Transaction>(0);
36        from_xtp(*tbt_data, transaction);
37        transaction_band_writer_->close_data();
38    }
39}

get_public_writer

writer_ptr &get_public_writer()

dest为0的writer单独使用一个变量存放, 调用该接口直接返回写入dest为0的writer, 效果等价于get_writer(0), 区别是该接口不涉及stl容器访问, 可以在子线程调用

open_data<T>

template <typename T> std::enable_if_t<size_fixed_v<T>, T &> open_data(int64_t trigger_time = 0);

该接口是属于writer的接口, 用于生成Order, Trade 等数据使用

close_data

void close_data(int64_t gen_time = time::now_in_nano());

该接口是属于writer的接口, 用于在open_data完成数据写入后, 标记写入完成;

open_data和close_data必须配对使用, 同一个writer中间不可以进行嵌套

范例

 1// xtp在收到快照行情推送时, 将数据转换成Quote写到PUBLIC进行广播
 2void MarketDataXTP::OnQueryAllTickers(XTPQSI *ticker_info, XTPRI *error_info, bool is_last) {
 3    if (nullptr != error_info && error_info->error_id != 0) {
 4        SPDLOG_ERROR("error_id : {} , error_msg : {}", error_info->error_id, error_info->error_msg);
 5        return;
 6    }
 7
 8    if (nullptr == ticker_info) {
 9        SPDLOG_ERROR("ticker_info is nullptr");
10        return;
11    }
12
13    Instrument &instrument = get_public_writer()->open_data<Instrument>(0);
14    from_xtp(ticker_info, instrument);
15    get_public_writer()->close_data();
16}

行情推送处理

快照行情写入

当行情源API的快照回调虚函数触发时, 需要将行情源的快照数据转换成kungfu::longfist::types::Quote数据结构, 快照数据可以直接写入到PUBLIC信道里, 具体操作方式参考以下XTP范例.

范例

 1// 交易所转换
 2inline void from_xtp(const XTP_MARKET_TYPE &xtp_market_type, char *exchange_id) {
 3    if (xtp_market_type == XTP_MKT_SH_A) {
 4        strcpy(exchange_id, "SSE");
 5    } else if (xtp_market_type == XTP_MKT_SZ_A) {
 6        strcpy(exchange_id, "SZE");
 7    }
 8}
 9
10// xtp的Quote数据结构转换实现
11inline void from_xtp(const XTPMarketDataStruct &ori, Quote &des) {
12    des.data_time = nsec_from_xtp_timestamp(ori.data_time);
13    des.instrument_id = ori.ticker;
14    from_xtp(ori.exchange_id, des.exchange_id);
15
16    des.instrument_type = ori.data_type != XTP_MARKETDATA_OPTION ? get_instrument_type(des.exchange_id, des.instrument_id)
17                                                                : InstrumentType::StockOption;
18
19    des.last_price = ori.last_price;
20    des.pre_settlement_price = ori.pre_settl_price;
21    des.pre_close_price = ori.pre_close_price;
22    des.open_price = ori.open_price;
23    des.high_price = ori.high_price;
24    des.low_price = ori.low_price;
25    des.volume = ori.qty;
26    des.turnover = ori.turnover;
27    des.close_price = ori.close_price;
28    des.settlement_price = ori.settl_price;
29    des.upper_limit_price = ori.upper_limit_price;
30    des.lower_limit_price = ori.lower_limit_price;
31    des.total_trade_num = ori.trades_count;
32
33    memcpy(des.ask_price, ori.ask, sizeof(des.ask_price));
34    memcpy(des.bid_price, ori.bid, sizeof(des.ask_price));
35    for (std::size_t i = 0; i < 10; i++) {
36        des.ask_volume[i] = ori.ask_qty[i];
37        des.bid_volume[i] = ori.bid_qty[i];
38    }
39}
40
41void MarketDataXTP::OnDepthMarketData(XTPMD *market_data, int64_t *bid1_qty, int32_t bid1_count, int32_t max_bid1_count,
42                                  int64_t *ask1_qty, int32_t ask1_count, int32_t max_ask1_count) {
43    if (nullptr == market_data) {
44        SPDLOG_ERROR("XTPMD is nullptr");
45    }
46
47    Quote &quote = get_public_writer()->open_data<Quote>(0);
48    from_xtp(*market_data, quote);
49    get_public_writer()->close_data();
50}

逐笔行情写入

当行情源API的快照回调虚函数触发时, 需要将行情源的逐笔委托行情数据转换成 kungfu::longfist::types::Entrust 数据结构, 需要将行情源的逐笔成交行情数据转换成 kungfu::longfist::types::Transaction 数据结构.

建议分别为Entrust和Transaction分别创建各自的band的信道, 具体操作参考以下XTP范例.

范例

  1// 声明两个线程私有writer变量
  2class MarketDataXTP : public XTP::API::QuoteSpi, public broker::MarketData {
  3// ... 其他内容
  4private:
  5    inline static thread_local yijinjing::journal::writer_ptr entrust_band_writer_ = nullptr;
  6    inline static thread_local yijinjing::journal::writer_ptr transaction_band_writer_ = nullptr;
  7};
  8
  9// pre_start 申请创建对应的band信道
 10void MarketDataXTP::pre_start() {
 11    entrust_band_uid_ = request_band("market-data-band-entrust", 256);
 12    transaction_band_uid_ = request_band("market-data-band-transaction", 256);
 13}
 14
 15// 交易所转换
 16inline void from_xtp(const XTP_MARKET_TYPE &xtp_market_type, char *exchange_id) {
 17    if (xtp_market_type == XTP_MKT_SH_A) {
 18        strcpy(exchange_id, "SSE");
 19    } else if (xtp_market_type == XTP_MKT_SZ_A) {
 20        strcpy(exchange_id, "SZE");
 21    }
 22}
 23
 24// 逐笔委托转换
 25inline void from_xtp(const XTPTickByTickStruct &ori, Entrust &des) {
 26    from_xtp(ori.exchange_id, des.exchange_id);
 27    des.instrument_id = ori.ticker;
 28    des.data_time = nsec_from_xtp_timestamp(ori.data_time);
 29
 30    des.price = ori.entrust.price;
 31    des.volume = ori.entrust.qty;
 32    des.main_seq = ori.entrust.channel_no;
 33    des.seq = ori.entrust.seq;
 34
 35    if (ori.entrust.ord_type == '1') {
 36        des.price_type = PriceType::Any;
 37    } else if (ori.entrust.ord_type == '2') {
 38        des.price_type = PriceType::Limit;
 39    } else if (ori.entrust.ord_type == 'U') {
 40        des.price_type = PriceType::ForwardBest;
 41    }
 42
 43    // xtp(深交所的order_no在xtp接口注释标注为无意义,偶尔为0 seq对应的是真正的订单号 上交所的order_no是订单号)
 44    if (strcmp(des.exchange_id, "SSE")) {
 45        des.orig_order_no = ori.entrust.order_no;
 46    } else {
 47        des.orig_order_no = ori.entrust.seq;
 48    }
 49
 50    switch (ori.entrust.side) {
 51    case 'B': {
 52        des.side = Side::Buy;
 53        break;
 54    }
 55    case 'S': {
 56        des.side = Side::Sell;
 57        break;
 58    }
 59    case '1': {
 60        des.side = Side::Buy;
 61        break;
 62    }
 63    case '2': {
 64        des.side = Side::Sell;
 65        break;
 66    }
 67    default: {
 68        des.side = Side::Unknown;
 69        break;
 70    }
 71    }
 72}
 73
 74
 75
 76
 77// XTP行情源的逐笔行情回调
 78void MarketDataXTP::OnTickByTick(XTPTBT *tbt_data) {
 79    if (tbt_data->type == XTP_TBT_ENTRUST) {
 80        if (tbt_data->entrust.ord_type == 'D') {
 81            if (not transaction_band_writer_) {
 82                if (not has_band_writer(transaction_band_uid_)) {
 83                    return;
 84                }
 85                transaction_band_writer_ = get_band_writer(transaction_band_uid_);
 86            }
 87            Transaction &transaction = transaction_band_writer_->open_data<Transaction>(0);
 88            from_xtp(*tbt_data, transaction);
 89            transaction_band_writer_->close_data();
 90        } else {
 91            if (not entrust_band_writer_) {
 92                if (not has_band_writer(entrust_band_uid_)) {
 93                return;
 94                }
 95                entrust_band_writer_ = get_band_writer(entrust_band_uid_);
 96            }
 97            Entrust &entrust = entrust_band_writer_->open_data<Entrust>(0);
 98            from_xtp(*tbt_data, entrust);
 99            entrust_band_writer_->close_data();
100        }
101    } else if (tbt_data->type == XTP_TBT_TRADE) {
102        if (not transaction_band_writer_) {
103            if (not has_band_writer(transaction_band_uid_)) {
104                return;
105            }
106            transaction_band_writer_ = get_band_writer(transaction_band_uid_);
107        }
108        Transaction &transaction = transaction_band_writer_->open_data<Transaction>(0);
109        from_xtp(*tbt_data, transaction);
110        transaction_band_writer_->close_data();
111    }
112}