快速开始

交易任务

从零开始构建一个交易任务

1. 根据目录结构创建文件

源码目录结构:

kfx-task-condition-demo/                    # 交易任务文件名(英文名称,随意)
├── src/
│   └── python
|       └── ConditionOrder                  # 交易任务名称(英文名称,随意)
|           └── __init__.py                 # python交易任务策略代码
├── README.md                               # 交易任务说明
└── package.json                            # 编译配置信息

交易任务实现代码 __init__.py

  1import kungfu
  2from kungfu.wingchun.constants import *
  3import json
  4import time
  5import math
  6import threading
  7from datetime import datetime
  8from pykungfu import wingchun as wc
  9
 10yjj = kungfu.__binding__.yijinjing
 11
 12
 13class Config(object):
 14    def __init__(self, param):
 15        sourceAccountList = param["accountId"].split("_")
 16        self.marketSource = param["marketSource"]
 17        exchangeTicker = param["ticker"].split("_")
 18        self.side = Side(param["side"])
 19        self.offset = Offset(param["offset"])
 20        self.priceType = PriceType(param["priceType"])
 21        self.volume = int(param["volume"])
 22        self.maxLot = int(param.get("maxLot", 0))
 23        self.startTime = str_to_nanotime(param.get("startTime", "0"))
 24        self.orderPrice = param["orderPrice"]
 25        self.source = ""
 26        if len(sourceAccountList) == 2 and len(exchangeTicker) == 5:
 27            self.source = sourceAccountList[0]
 28            self.account = sourceAccountList[1]
 29            self.exchange = exchangeTicker[0]
 30            self.ticker = exchangeTicker[1]
 31        self.priceCondition = param["priceCondition"]
 32
 33
 34class PriceCondition(object):
 35    def __init__(self, param):
 36        self.currentPrice = int(param["currentPrice"])
 37        self.compare = int(param["compare"])
 38        self.triggerPrice = float(param["triggerPrice"])
 39
 40
 41def update_strategy_state(state, value, context):
 42    strategy_state = lf.types.StrategyStateUpdate()
 43
 44    if state == lf.enums.StrategyState.Normal:
 45        strategy_state.value = str(value)
 46        context.log.info(str(value))
 47    elif state == lf.enums.StrategyState.Warn:
 48        strategy_state.value = str(value)
 49        context.log.warn(str(value))
 50    else:
 51        strategy_state.value = str(value)
 52        context.log.error(str(value))
 53
 54    strategy_state.state = state
 55
 56    context.update_strategy_state(strategy_state)
 57
 58
 59def pre_start(context):
 60    context.MIN_VOL = 0
 61    context.time_trigger = False
 62    context.price = -1.0
 63    context.order_placed = False
 64    context.log.info("参数 {}".format(context.arguments))
 65    args_dict = json.loads(context.arguments)
 66
 67    context.config = Config(args_dict)
 68    context.trigger_info = ""
 69    if context.config.startTime > 0:
 70        date_time_for_nano = datetime.fromtimestamp(
 71            context.config.startTime / (10**9)
 72        )
 73        time_str = date_time_for_nano.strftime("%Y-%m-%d %H:%M:%S.%f")
 74        context.trigger_info = "时间满足" + time_str
 75    if (not context.config.priceCondition) and context.config.startTime == 0:
 76        update_strategy_state(
 77            lf.enums.StrategyState.Error,
 78            "触发时间和触发价格没设置.",
 79            context,
 80        )
 81        context.log.info("触发时间和触发价格都没设置")
 82        context.req_deregister()
 83        return
 84    if context.config.source:
 85        context.add_account(context.config.source, context.config.account)
 86        context.subscribe(
 87            context.config.marketSource,
 88            [context.config.ticker],
 89            context.config.exchange,
 90        )
 91
 92        update_strategy_state(
 93            lf.enums.StrategyState.Normal,
 94            "正常",
 95            context,
 96        )
 97
 98    ins_type = wc.utils.get_instrument_type(
 99        context.config.exchange, context.config.ticker
100    )
101    context.log.info("(标的类型) {}".format(ins_type))
102    if context.MIN_VOL == 0:
103        context.MIN_VOL = type_to_minvol(ins_type)
104
105
106def str_to_nanotime(tm):
107    if tm is None or tm == "" or tm == "Invalid Date":
108        return 0
109    if tm.isdigit():  # in milliseconds
110        return int(tm) * 10**6
111    else:
112        year_month_day = time.strftime("%Y-%m-%d", time.localtime())
113        ymdhms = year_month_day + " " + tm.split(" ")[1]
114        timeArray = time.strptime(ymdhms, "%Y-%m-%d %H:%M:%S")
115        nano = int(time.mktime(timeArray) * 10**9)
116        return nano
117
118
119def type_to_minvol(argument):
120    switcher = {
121        InstrumentType.Stock: int(100),
122        InstrumentType.Future: int(1),
123        InstrumentType.Bond: int(1),
124        InstrumentType.StockOption: int(1),
125        InstrumentType.Fund: int(1),
126        InstrumentType.TechStock: int(200),
127        InstrumentType.Index: int(1),
128    }
129    return switcher.get(argument, int(1))
130
131
132def place_order(context):
133    if not context.order_placed:
134        if context.price < 0:
135            update_strategy_state(
136                lf.enums.StrategyState.Warn,
137                "没有收到行情",
138                context,
139            )
140            context.log.error("没有收到行情, 无法下单, 请检查行情连接")
141            context.req_deregister()
142            return
143
144        rest_volume = context.config.volume
145        if context.config.maxLot == 0 or context.config.maxLot >= context.config.volume:
146            order_volume = rest_volume
147        else:
148            order_volume = context.config.maxLot
149        order_volume = int(
150            math.ceil(float(order_volume) / context.MIN_VOL) * context.MIN_VOL
151        )
152        i_order = 0
153        vol_list = dict()
154        now_nano = time.time_ns()
155        while rest_volume > 0:
156            i_order += 1
157            volume = (
158                order_volume
159                if order_volume <= rest_volume
160                else int(
161                    math.ceil(float(rest_volume) / context.MIN_VOL) * context.MIN_VOL
162                )
163            )
164            order_id = context.insert_order(
165                context.config.ticker,
166                context.config.exchange,
167                context.config.source,
168                context.config.account,
169                context.price,
170                volume,
171                context.config.priceType,
172                context.config.side,
173                context.config.offset,
174            )
175            rest_volume -= order_volume
176            vol_list[order_id] = volume
177        context.order_placed = True
178        date_time_for_nano = datetime.fromtimestamp(now_nano / (10**9))
179        time_str = date_time_for_nano.strftime("%Y-%m-%d %H:%M:%S.%f")
180        context.log.info(
181            "-------------------- {} 开始下单 时间 {} --------------------".format(
182                context.trigger_info, time_str
183            )
184        )
185        for key, val in vol_list.items():
186            context.log.info("订单号 {}, 下单数量 {} 下单价格 {}".format(key, val, context.price))
187
188        update_strategy_state(
189            lf.enums.StrategyState.Normal,
190            "下单完成, 退出任务",
191            context,
192        )
193        context.log.info("下单完成, 退出任务")
194        context.req_deregister()
195
196
197def post_start(context):
198    start = context.config.startTime - 60000000
199
200    if context.config.startTime > 0:
201        context.add_timer(context.config.startTime, lambda ctx, event: place_order(ctx))
202
203
204def on_quote(context, quote, source_location, dest):
205    if context.config.orderPrice == "0":
206        context.price = quote.last_price
207    elif context.config.orderPrice == "1":
208        if context.config.side == Side.Buy:
209            context.price = quote.ask_price[0]
210        else:
211            context.price = quote.bid_price[0]
212    elif context.config.orderPrice == "2":
213        if context.config.side == Side.Buy:
214            context.price = quote.bid_price[0]
215        else:
216            context.price = quote.ask_price[0]
217
218    if context.config.priceCondition:
219        for i, item in enumerate(context.config.priceCondition):
220            is_price_triggerred = True
221            if item["currentPrice"] == "1":
222                quote_price = quote.bid_price[0]
223            elif item["currentPrice"] == "-1":
224                quote_price = quote.ask_price[0]
225            else:
226                quote_price = quote.last_price
227            if item["compare"] == "1":
228                is_price_triggerred = quote_price >= float(item["triggerPrice"])
229                if is_price_triggerred:
230                    context.trigger_info = "价格大于等于" + str(item["triggerPrice"])
231            elif item["compare"] == "2":
232                is_price_triggerred = quote_price > float(item["triggerPrice"])
233                if is_price_triggerred:
234                    context.trigger_info = "价格大于" + str(item["triggerPrice"])
235            elif item["compare"] == "3":
236                is_price_triggerred = quote_price <= float(item["triggerPrice"])
237                if is_price_triggerred:
238                    context.trigger_info = "价格小于等于" + str(item["triggerPrice"])
239            elif item["compare"] == "4":
240                is_price_triggerred = quote_price < float(item["triggerPrice"])
241                if is_price_triggerred:
242                    context.trigger_info = "价格小于" + str(item["triggerPrice"])
243            else:
244                return
245            if not is_price_triggerred:
246                return
247        place_order(context)

配置文件package.json

  1{
  2    "name": "@kungfu-trader/kfx-task-condition",
  3    "author": {
  4        "name": "kungfu-trader",
  5        "email": "info@kungfu.link"
  6    },
  7    "kungfuBuild": {
  8        "python": {
  9            "dependencies": {}
 10        }
 11    },
 12    "kungfuConfig": {
 13        "key": "ConditionOrder",
 14        "name": "条件单",
 15        "ui_config": {
 16            "position": "make_order"
 17        },
 18        "language": {
 19            "zh-CN": {
 20                "accountId": "账户",
 21                "marketSource": "行情",
 22                "ticker": "标的",
 23                "side": "买卖",
 24                "offset": "开平",
 25                "priceType": "下单类型",
 26                "priceCondition": "价格条件",
 27                "currentPrice": "当前价格",
 28                "currentPrice_0": "买一价",
 29                "currentPrice_1": "卖一价",
 30                "currentPrice_2": "最新价",
 31                "compare": "比较符",
 32                "triggerPrice": "触发价格",
 33                "orderPrice": "下单价格",
 34                "orderPrice_0": "最新价",
 35                "orderPrice_1": "对手价一档",
 36                "orderPrice_2": "同方向一档",
 37                "volume": "数量",
 38                "maxLot": "单次最大手数",
 39                "maxLotTip": "柜台允许的单次最大手数, 以此为基础进行拆单, 不填则表示柜台无限制, 股票请填100的整数倍, 否则自动向下取整, 小于100则会强制设成100",
 40                "startTime": "触发时间"
 41            },
 42            "en-US": {
 43                "accountId": "Account Id",
 44                "marketSource": "Market Source",
 45                "ticker": "Ticker",
 46                "side": "Side",
 47                "offset": "Offset",
 48                "priceType": "Price Type",
 49                "priceCondition": "Price Condition",
 50                "currentPrice": "Current Price",
 51                "currentPrice_0": "Buy First Price",
 52                "currentPrice_1": "Sell First Price",
 53                "currentPrice_2": "Latest Price",
 54                "compare": "Compare",
 55                "triggerPrice": "Trigger Price",
 56                "orderPrice": "Order Price",
 57                "orderPrice_0": "Latest Price",
 58                "orderPrice_1": "Opponent First Level Price",
 59                "orderPrice_2": "Same Side First Level Price",
 60                "volume": "Volume",
 61                "maxLot": "Max Lot",
 62                "maxLotTip": "The single max hands that counter allow, this is the basis for the dismantling of the order. If you don't fill in the form, it means the counter is unlimited. Please fill in an integer multiple of 100, otherwise it will be rounded down automatically. If it is less than 100, it will be set to 100.",
 63                "startTime": "Trigger Time"
 64            }
 65        },
 66        "config": {
 67            "strategy": {
 68                "type": "trade",
 69                "settings": [
 70                    {
 71                        "key": "accountId",
 72                        "name": "ConditionOrder.accountId",
 73                        "type": "td",
 74                        "required": true,
 75                        "showArg": true
 76                    },
 77                    {
 78                        "key": "marketSource",
 79                        "name": "ConditionOrder.marketSource",
 80                        "type": "md",
 81                        "required": true,
 82                        "showArg": true
 83                    },
 84                    {
 85                        "key": "ticker",
 86                        "name": "ConditionOrder.ticker",
 87                        "type": "instrument",
 88                        "required": true,
 89                        "showArg": true
 90                    },
 91                    {
 92                        "key": "side",
 93                        "name": "ConditionOrder.side",
 94                        "type": "side",
 95                        "default": 0,
 96                        "required": true,
 97                        "showArg": true
 98                    },
 99                    {
100                        "key": "offset",
101                        "name": "ConditionOrder.offset",
102                        "type": "offset",
103                        "default": 0,
104                        "required": true,
105                        "showArg": true
106                    },
107                    {
108                        "key": "priceType",
109                        "name": "ConditionOrder.priceType",
110                        "type": "priceType",
111                        "default": "1",
112                        "required": false
113                    },
114                    {
115                        "key": "priceCondition",
116                        "name": "ConditionOrder.priceCondition",
117                        "type": "table",
118                        "columns": [
119                            {
120                                "key": "currentPrice",
121                                "name": "ConditionOrder.currentPrice",
122                                "type": "select",
123                                "options": [
124                                    {
125                                        "label": "ConditionOrder.currentPrice_0",
126                                        "value": "1"
127                                    },
128                                    {
129                                        "label": "ConditionOrder.currentPrice_1",
130                                        "value": "-1"
131                                    },
132                                    {
133                                        "label": "ConditionOrder.currentPrice_2",
134                                        "value": "0"
135                                    }
136                                ],
137                                "default": "0",
138                                "required": true
139                            },
140                            {
141                                "key": "compare",
142                                "name": "ConditionOrder.compare",
143                                "type": "select",
144                                "options": [
145                                    {
146                                        "label": ">=",
147                                        "value": "1"
148                                    },
149                                    {
150                                        "label": ">",
151                                        "value": "2"
152                                    },
153                                    {
154                                        "label": "<=",
155                                        "value": "3"
156                                    },
157                                    {
158                                        "label": "<",
159                                        "value": "4"
160                                    }
161                                ],
162                                "default": "1",
163                                "required": true
164                            },
165                            {
166                                "key": "triggerPrice",
167                                "name": "ConditionOrder.triggerPrice",
168                                "type": "float",
169                                "required": true
170                            }
171                        ],
172                        "required": false
173                    },
174                    {
175                        "key": "orderPrice",
176                        "name": "ConditionOrder.orderPrice",
177                        "type": "select",
178                        "options": [
179                            {
180                                "label": "ConditionOrder.orderPrice_0",
181                                "value": "0"
182                            },
183                            {
184                                "label": "ConditionOrder.orderPrice_1",
185                                "value": "1"
186                            },
187                            {
188                                "label": "ConditionOrder.orderPrice_2",
189                                "value": "2"
190                            }
191                        ],
192                        "required": true
193                    },
194                    {
195                        "key": "volume",
196                        "name": "ConditionOrder.volume",
197                        "type": "int",
198                        "min": 0,
199                        "required": true
200                    },
201                    {
202                        "key": "maxLot",
203                        "name": "ConditionOrder.maxLot",
204                        "type": "int",
205                        "min": 0,
206                        "tip": "ConditionOrder.maxLotTip",
207                        "required": false,
208                        "default": 0
209                    },
210                    {
211                        "key": "startTime",
212                        "name": "ConditionOrder.startTime",
213                        "type": "timePicker",
214                        "required": false
215                    }
216                ]
217            }
218        }
219    }
220}

说明文档 README.md

1条件单逻辑说明 :
2
3- 条件单可以接受两个类型的条件为约束,一个是价格条件,一个是时间条件
4- 当仅有价格条件时 会在当前价格满足大于小于等于触发价格时下单
5- 当仅有时间条件时 会在到达目标设定时间点时下单
6- 当价格条件跟时间条件同时存在时,哪个条件先满足,以哪个条件下单
7- 单次最大手数:若设置下单数量1000,而单比最大下单量为100,则会在下单时,拆为10份,每次100,一同下出。

2. 编译生成二进制文件

编译流程:

(1)在命令行中进入交易任务文件根目录下

$ cd kfx-task-condition-demo/
# 举例交易任务文件名为 kfx-task-condition-demo

(2)执行kfs编译命令

$ kfs extension build

注意

  • 执行编译的kfs命令路径为功夫的安装目录下 /Kungfu/resources/kfc/kfs

  • 举例: Windows系统下,功夫安装路径为D盘的根目录,即功夫安装目录为 D:/Kungfu

  • 编译命令为 : D:/Kungfu/resources/kfc/kfs.exe extension build

  • 举例: linux系统下,功夫安装路径为/opt/Kungfu

  • 编译命令为 : /opt/Kungfu/resources/kfc/kfs extension build

编译后文件目录结构:

kfx-task-condition-demo/
├── src/
│   └── python
|       └── ConditionOrder
|           └── __init__.py
├── README.md
├── package.json
├── __pypackages__/                                         # Python模块库, 自动生成
├── dist/                                                   # 编译打包出来的二进制文件所在文件夹
|   └── ConditionOrder
|       └── ConditionOrder.cp39-win_amd64.pyd               # 二进制文件
├── pdm.lock                                                # build后下载依赖库自动生成的文件
└── pyproject.toml                                          # build后下载依赖库自动生成的文件

3. 将文件拷贝到插件目录

流程:

(1)在命令行中进入交易任务文件根目录下

$ cd kfx-task-condition-demo/
# 举例交易任务文件名为 kfx-task-condition-demo

(2)复制二进制文件所在的目录拷贝到Kungfu插件目录

注意

  • 插件目录路径为功夫的安装目录下 /Kungfu/resources/app/kungfu-extensions/

  • Windows系统 : Copy-Item -Path ./dist/ConditionOrder/ -Destination D:/Kungfu/resources/app/kungfu-extensions/ConditionOrder/ -Recurse -Force

  • Linux系统 : cp -r ./dist/ConditionOrder/ /opt/Kungfu/resources/app/kungfu-extensions/


4. 添加交易任务

重启Kungfu图形客户端,选择主面板中的”交易任务”面板,点击右上角的”添加”按钮,在弹出的”选择交易任务”面板中选择”条件单”

_images/%E6%9D%A1%E4%BB%B6%E5%8D%95.png

交易任务和策略的区别

功能描述

交易任务

策略

前端参数

可传参(交易任务可以接收前端配置的参数,前端配置的参数会以一个json字符串格式传入到 context.arguments。)

不可传参

交易进度信息统计展示

展示进度信息(交易任务可以在成交和委托回调中,统计成交进度,或是属于交易任务特有的指标,前端界面可以显示这些信息,同时还可以进行异常报警提示。)

不展示

柜台对接

从零开始构建一个交易柜台

创建目录结构

源码目录结构:

xtp/                                        # xtp柜台名称
├── src/
│   └── cpp
│       ├── buffer_data.h
│       ├── exports.cpp
│       ├── marketdata_xtp.cpp
│       ├── marketdata_xtp.h
│       ├── serialize_xtp.h
│       ├── trader_xtp.cpp
│       ├── trader_xtp.h
│       └── type_convert.h
└── package.json                            # 编译配置信息

package.json文件

  1{
  2    "name": "@kungfu-trader/kfx-broker-xtp-demo",
  3    "author": {
  4        "name": "Kungfu Trader",
  5        "email": "info@kungfu.link"
  6    },
  7    "version": "3.0.6-alpha.4",
  8    "description": "Kungfu Extension - XTP Demo",
  9    "license": "Apache-2.0",
 10    "main": "package.json",
 11    "repository": {
 12        "url": "https://github.com/kungfu-trader/kungfu.git"
 13    },
 14    "publishConfig": {
 15        "registry": "https://npm.pkg.github.com"
 16    },
 17    "binary": {
 18        "module_name": "kfx-broker-xtp-demo",
 19        "module_path": "dist/xtp",
 20        "remote_path": "{module_name}/v{major}/v{version}",
 21        "package_name": "{module_name}-v{version}-{platform}-{arch}-{configuration}.tar.gz",
 22        "host": "https://prebuilt.libkungfu.cc"
 23    },
 24    "scripts": {
 25        "build": "C:/Users/PC/Documents/kfgit/v27/kf27/artifact/build/stage/artifact-kungfu/v2/v3.0.6-alpha.4/win-unpacked/resources/kfc/kfs.exe extension build", // 替换成你的Kungfu安装目录
 26        "clean": "C:/Users/PC/Documents/kfgit/v27/kf27/artifact/build/stage/artifact-kungfu/v2/v3.0.6-alpha.4/win-unpacked/resources/kfc/kfs.exe extension clean", // 替换成你的Kungfu安装目录
 27        "format": "node ../../framework/core/.gyp/run-format-cpp.js src",
 28        "install": "node -e \"require('@kungfu-trader/kungfu-core').prebuilt('install')\"",
 29        "package": "kfs extension package"
 30    },
 31    "dependencies": {
 32        "@kungfu-trader/kungfu-core": "^3.0.6-alpha.4"
 33    },
 34    "devDependencies": {
 35        "@kungfu-trader/kungfu-sdk": "^3.0.6-alpha.4"
 36    },
 37    "kungfuDependencies": {
 38        "xtp": "v2.2.37.4"
 39    },
 40    "kungfuBuild": {
 41        "build_type": "Release",
 42        "cpp": {
 43            "target": "bind/python",
 44            "links": {
 45                "windows": [
 46                    "xtptraderapi",
 47                    "xtpquoteapi"
 48                ],
 49                "linux": [
 50                    "xtptraderapi",
 51                    "xtpquoteapi"
 52                ],
 53                "macos": [
 54                    "xtptraderapi",
 55                    "xtpquoteapi"
 56                ]
 57            }
 58        }
 59    },
 60    "kungfuConfig": {
 61        "key": "xtp",
 62        "name": "XTP",
 63        "language": {
 64            "zh-CN": {
 65                "account_name": "账户别名",
 66                "account_name_tip": "请填写账户别名",
 67                "account_id": "账户",
 68                "account_id_tip": "请填写账户, 例如:1504091",
 69                "password": "密码",
 70                "password_tip": "请填写密码, 例如:123456",
 71                "software_key": "软件密钥",
 72                "software_key_tip": "请填写软件密钥, 例如:b8aa7173bba3470e390d787219b2112",
 73                "td_ip": "交易IP",
 74                "td_ip_tip": "请填写交易IP, 例如:61.152.102.111",
 75                "td_port": "交易端口",
 76                "td_port_tip": "请填写交易端口, 例如:8601",
 77                "client_id": "客户ID",
 78                "client_id_tip": "请填写自定义多点登录ID 1-99整数,不同节点之间不要重复",
 79                "md_ip": "行情IP",
 80                "md_ip_tip": "请填写行情IP, 例如:61.152.102.110",
 81                "md_port": "行情端口",
 82                "md_port_tip": "请填写行情端口, 例如:8602",
 83                "protocol": "协议",
 84                "protocol_tip": "请选择协议, tcp 或者 udp",
 85                "buffer_size": "缓冲区大小",
 86                "buffer_size_tip": "请填写 缓冲区大小(mb)",
 87                "sync_external_order": "同步外部订单",
 88                "sync_external_order_msg": "是否同步外部订单",
 89                "sync_external_order_tip": "若开启则同步用户在其他交易软件的订单",
 90                "recover_order_trade": "恢复订单",
 91                "recover_order_trade_msg": "启动时是否查询恢复订单",
 92                "recover_order_trade_tip": "若开启则启动时查询今日委托和成交",
 93                "query_instruments": "查询可交易标的",
 94                "query_instruments_tip": "是否查询可交易标的, 开启后会查询所有可交易标的, 流量太大频繁查询可能导致账号或ip被XTP拉黑"
 95            },
 96            "en-US": {
 97                "account_name": "account name",
 98                "account_name_tip": "Please enter account name",
 99                "account_id": "account id",
100                "account_id_tip": "Please enter account id",
101                "password": "password",
102                "password_tip": "Please enter password, for example:123456",
103                "software_key": "software key",
104                "software_key_tip": "Please enter software key, for example :b8aa7173bba3470e390d787219b2112",
105                "td_ip": "td IP",
106                "td_ip_tip": "Please enter td IP, for example:61.152.102.111",
107                "td_port": "td port",
108                "td_port_tip": "Please enter td port, for example:8601",
109                "client_id": "client id",
110                "client_id_tip": "Please enter t user-defined multipoint client ID, which is an integer ranging from 1 to 99. The value must be unique on different nodes",
111                "md_ip": "md IP",
112                "md_ip_tip": "Please enter md IP, for example :61.152.102.110",
113                "md_port": "md port",
114                "md_port_tip": "Please enter md port, for example:8602",
115                "protocol": "protocol",
116                "protocol_tip": "Please select protocol, tcp or udp",
117                "buffer_size": "buffer size",
118                "buffer_size_tip": "Please enter buffer size(mb)",
119                "sync_external_order": "sync external order",
120                "sync_external_order_msg": "Whether open sync_external_order",
121                "sync_external_order_tip": "If enabled, it synchronizes users' orders in other trading software",
122                "recover_order_trade": "recover order trade",
123                "recover_order_trade_msg": "Whether recover order trade",
124                "recover_order_trade_tip": "If enabled, query order and trade when TD ready",
125                "query_instruments": "query instruments",
126                "query_instruments_tip": "If enabled, query instruments. too much infomation may result in account or ip blacklisted by XTP"
127            }
128        },
129        "config": {
130            "td": {
131                "type": [
132                    "stock"
133                ],
134                "settings": [
135                    {
136                        "key": "account_name",
137                        "name": "xtp.account_name",
138                        "type": "str",
139                        "tip": "xtp.account_name_tip"
140                    },
141                    {
142                        "key": "account_id",
143                        "name": "xtp.account_id",
144                        "type": "str",
145                        "required": true,
146                        "primary": true,
147                        "tip": "xtp.account_id_tip"
148                    },
149                    {
150                        "key": "password",
151                        "name": "xtp.password",
152                        "type": "password",
153                        "required": true,
154                        "tip": "xtp.password_tip"
155                    },
156                    {
157                        "key": "software_key",
158                        "name": "xtp.software_key",
159                        "type": "str",
160                        "required": true,
161                        "tip": "xtp.software_key_tip"
162                    },
163                    {
164                        "key": "td_ip",
165                        "name": "xtp.td_ip",
166                        "type": "str",
167                        "required": true,
168                        "tip": "xtp.td_ip_tip"
169                    },
170                    {
171                        "key": "td_port",
172                        "name": "xtp.td_port",
173                        "type": "int",
174                        "required": true,
175                        "tip": "xtp.td_port_tip"
176                    },
177                    {
178                        "key": "client_id",
179                        "name": "xtp.client_id",
180                        "type": "int",
181                        "required": true,
182                        "tip": "xtp.client_id_tip"
183                    },
184                    {
185                        "key": "sync_external_order",
186                        "name": "xtp.sync_external_order",
187                        "type": "bool",
188                        "errMsg": "xtp.sync_external_order_msg",
189                        "required": false,
190                        "default": false,
191                        "tip": "xtp.sync_external_order_tip"
192                    },
193                    {
194                        "key": "recover_order_trade",
195                        "name": "xtp.recover_order_trade",
196                        "type": "bool",
197                        "errMsg": "xtp.recover_order_trade_msg",
198                        "required": false,
199                        "default": false,
200                        "tip": "xtp.recover_order_trade_tip"
201                    }
202                ]
203            },
204            "md": {
205                "type": [
206                    "stock"
207                ],
208                "settings": [
209                    {
210                        "key": "account_id",
211                        "name": "xtp.account_id",
212                        "type": "str",
213                        "required": true,
214                        "tip": "xtp.account_id_tip",
215                        "default": "15011218"
216                    },
217                    {
218                        "key": "password",
219                        "name": "xtp.password",
220                        "type": "password",
221                        "required": true,
222                        "tip": "xtp.password_tip",
223                        "default": "PsVqy99v"
224                    },
225                    {
226                        "key": "md_ip",
227                        "name": "xtp.md_ip",
228                        "type": "str",
229                        "required": true,
230                        "tip": "xtp.md_ip_tip",
231                        "default": "119.3.103.38"
232                    },
233                    {
234                        "key": "md_port",
235                        "name": "xtp.md_port",
236                        "type": "int",
237                        "required": true,
238                        "tip": "xtp.md_port_tip",
239                        "default": 6002
240                    },
241                    {
242                        "key": "protocol",
243                        "name": "xtp.protocol",
244                        "type": "select",
245                        "options": [
246                            {
247                                "value": "tcp",
248                                "label": "tcp"
249                            },
250                            {
251                                "value": "udp",
252                                "label": "udp"
253                            }
254                        ],
255                        "required": false,
256                        "tip": "xtp.protocol_tip",
257                        "default": "tcp"
258                    },
259                    {
260                        "key": "buffer_size",
261                        "name": "xtp.buffer_size",
262                        "type": "int",
263                        "tip": "xtp.buffer_size_tip",
264                        "required": false
265                    },
266                    {
267                        "key": "client_id",
268                        "name": "xtp.client_id",
269                        "type": "int",
270                        "required": true,
271                        "tip": "xtp.client_id_tip",
272                        "default": 23
273                    },
274                    {
275                        "key": "query_instruments",
276                        "name": "xtp.query_instruments",
277                        "type": "bool",
278                        "required": false,
279                        "tip": "xtp.query_instruments_tip",
280                        "default": false
281                    }
282                ]
283            }
284        }
285    }
286}

serialize_xtp.h文件

XTP数据结构添加to_stirng方便打印日志查看数据

 1#ifndef KUNGFU_SERIALIZE_XTP_H
 2#define KUNGFU_SERIALIZE_XTP_H
 3#include <nlohmann/json.hpp>
 4#include <xtp_api_struct.h>
 5namespace nlohmann {
 6NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(XTPQueryOrderRsp, order_xtp_id, order_client_id, order_cancel_client_id,
 7                                order_cancel_xtp_id, ticker, market, price, quantity, price_type, side,
 8                                position_effect, reserved1, reserved2, business_type, qty_traded, qty_left,
 9                                insert_time, update_time, cancel_time, trade_amount, order_local_id, order_status,
10                                order_submit_status, order_type);
11NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(XTPOrderInsertInfo, order_xtp_id, order_client_id, ticker, market, price, stop_price,
12                                quantity, price_type, side, position_effect, reserved1, reserved2, business_type);
13NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(XTPTradeReport, order_xtp_id, order_client_id, ticker, market, local_order_id,
14                                exec_id, price, quantity, trade_time, trade_amount, report_index, order_exch_id,
15                                trade_type, side, position_effect, reserved1, reserved2, business_type, branch_pbu);
16NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(XTPOrderCancelInfo, order_cancel_xtp_id, order_xtp_id);
17NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(XTPQueryStkPositionRsp, ticker, ticker_name, market, total_qty, sellable_qty,
18                                avg_price, unrealized_pnl, yesterday_position, purchase_redeemable_qty,
19                                position_direction, position_security_type, executable_option, lockable_position,
20                                executable_underlying, locked_position, usable_locked_position, profit_price,
21                                buy_cost, profit_cost, unknown);
22NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(XTPQueryAssetRsp, total_asset, buying_power, security_asset, fund_buy_amount,
23                                fund_buy_fee, fund_sell_amount, fund_sell_fee, withholding_amount, account_type,
24                                frozen_margin, frozen_exec_cash, frozen_exec_fee, pay_later, preadva_pay,
25                                orig_banlance, banlance, deposit_withdraw, trade_netting, captial_asset,
26                                force_freeze_amount, preferred_amount, repay_stock_aval_banlance,
27                                exchange_cur_risk_degree, company_cur_risk_degree, unknown);
28NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(XTPMarketDataStruct, exchange_id, ticker, last_price, pre_close_price, open_price,
29                                high_price, low_price, close_price, pre_total_long_positon, total_long_positon,
30                                pre_settl_price, settl_price, upper_limit_price, lower_limit_price, pre_delta,
31                                curr_delta, data_time, qty, turnover, avg_price, bid, ask, bid_qty, ask_qty,
32                                trades_count, ticker_status);
33NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(XTPRspInfoStruct, error_id, error_msg);
34NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(XTPOrderInfoEx, order_xtp_id, order_client_id, order_cancel_client_id,
35                                order_cancel_xtp_id, ticker, market, price, quantity, price_type, business_type,
36                                qty_traded, qty_left, insert_time, update_time, cancel_time, trade_amount,
37                                order_local_id, order_status, order_submit_status, order_type, order_exch_id,
38                                order_err_t, unknown);
39NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(XTPSpecificTickerStruct, exchange_id, ticker);
40} // namespace nlohmann
41namespace kungfu::wingchun::xtp {
42template <typename T> std::string to_string(const T &ori) {
43    nlohmann::json j;
44    to_json(j, ori);
45    return j.dump();
46}
47} // namespace kungfu::wingchun::xtp
48#endif

type_convert.h文件

XTP和Kungfu数据结构的转换

  1#ifndef KUNGFU_XTP_EXT_TYPE_CONVERT_H
  2#define KUNGFU_XTP_EXT_TYPE_CONVERT_H
  3
  4#include <cstddef>
  5#include <cstdio>
  6#include <cstring>
  7#include <ctime>
  8#include <kungfu/longfist/longfist.h>
  9#include <kungfu/wingchun/common.h>
 10#include <kungfu/yijinjing/time.h>
 11#include <nlohmann/json.hpp>
 12#include <xtp_api_struct.h>
 13
 14using namespace kungfu::longfist;
 15using namespace kungfu::longfist::enums;
 16using namespace kungfu::longfist::types;
 17
 18namespace kungfu::wingchun::xtp {
 19
 20template <typename T> inline void set_offset(T &t) {
 21    switch (t.side) {
 22    case Side::Buy:
 23        t.offset = Offset::Open;
 24        break;
 25    case Side::Sell:
 26        t.offset = Offset::Close;
 27        break;
 28    default:
 29        SPDLOG_ERROR("Invalidated kf_side : {} ", t.side);
 30        break;
 31    }
 32}
 33
 34inline XTP_PROTOCOL_TYPE get_xtp_protocol_type(const std::string &p) {
 35    if (p == "udp") {
 36        return XTP_PROTOCOL_UDP;
 37    } else {
 38        return XTP_PROTOCOL_TCP;
 39    }
 40}
 41
 42inline int64_t nsec_from_xtp_timestamp(int64_t xtp_time) {
 43    std::tm result = {};
 44    result.tm_year = xtp_time / (int64_t)1e13 - 1900;
 45    result.tm_mon = xtp_time % (int64_t)1e13 / (int64_t)1e11 - 1;
 46    result.tm_mday = xtp_time % (int64_t)1e11 / (int64_t)1e9;
 47    result.tm_hour = xtp_time % (int64_t)1e9 / (int64_t)1e7;
 48    result.tm_min = xtp_time % (int)1e7 / (int)1e5;
 49    result.tm_sec = xtp_time % (int)1e5 / (int)1e3;
 50    int milli_sec = xtp_time % (int)1e3;
 51    std::time_t parsed_time = std::mktime(&result);
 52    return parsed_time * kungfu::yijinjing::time_unit::NANOSECONDS_PER_SECOND +
 53        milli_sec * kungfu::yijinjing::time_unit::NANOSECONDS_PER_MILLISECOND;
 54}
 55
 56inline void from_xtp(const XTP_MARKET_TYPE &xtp_market_type, char *exchange_id) {
 57    if (xtp_market_type == XTP_MKT_SH_A) {
 58        strcpy(exchange_id, "SSE");
 59    } else if (xtp_market_type == XTP_MKT_SZ_A) {
 60        strcpy(exchange_id, "SZE");
 61    }
 62}
 63
 64inline void to_xtp(XTP_MARKET_TYPE &xtp_market_type, const char *exchange_id) {
 65    if (!strcmp(exchange_id, "SSE")) {
 66        xtp_market_type = XTP_MKT_SH_A;
 67    } else if (!strcmp(exchange_id, "SZE")) {
 68        xtp_market_type = XTP_MKT_SZ_A;
 69    } else {
 70        xtp_market_type = XTP_MKT_UNKNOWN;
 71    }
 72}
 73
 74inline std::string exchange_id_from_xtp(const XTP_EXCHANGE_TYPE ex) {
 75    if (ex == XTP_EXCHANGE_SH) {
 76        return EXCHANGE_SSE;
 77    } else if (ex == XTP_EXCHANGE_SZ) {
 78        return EXCHANGE_SZE;
 79    } else {
 80        return "Unknown";
 81    }
 82}
 83
 84inline void from_xtp(const XTP_EXCHANGE_TYPE &xtp_exchange_type, char *exchange_id) {
 85    if (xtp_exchange_type == XTP_EXCHANGE_SH) {
 86        strcpy(exchange_id, "SSE");
 87    } else if (xtp_exchange_type == XTP_EXCHANGE_SZ) {
 88        strcpy(exchange_id, "SZE");
 89    }
 90}
 91
 92inline void to_xtp_exchange(XTP_EXCHANGE_TYPE &xtp_exchange_type, const char *exchange_id) {
 93    if (strcmp(exchange_id, "SSE") == 0) {
 94        xtp_exchange_type = XTP_EXCHANGE_SH;
 95    } else if (strcmp(exchange_id, "SZE") == 0) {
 96        xtp_exchange_type = XTP_EXCHANGE_SZ;
 97    } else {
 98        xtp_exchange_type = XTP_EXCHANGE_UNKNOWN;
 99    }
100}
101
102inline void from_xtp(const XTP_PRICE_TYPE &xtp_price_type, const XTP_MARKET_TYPE &xtp_exchange_type,
103                    PriceType &price_type) {
104    if (xtp_price_type == XTP_PRICE_LIMIT)
105        price_type = PriceType::Limit;
106    else if (xtp_price_type == XTP_PRICE_BEST5_OR_CANCEL)
107        price_type = PriceType::FakBest5;
108    else if (xtp_exchange_type == XTP_MKT_SH_A) {
109        if (xtp_price_type == XTP_PRICE_BEST5_OR_LIMIT)
110            price_type = PriceType::ReverseBest;
111    } else if (xtp_exchange_type == XTP_MKT_SZ_A) {
112        if (xtp_price_type == XTP_PRICE_BEST_OR_CANCEL)
113            price_type = PriceType::Fak;
114        else if (xtp_price_type == XTP_PRICE_FORWARD_BEST)
115            price_type = PriceType::ForwardBest;
116        else if (xtp_price_type == XTP_PRICE_REVERSE_BEST_LIMIT)
117            price_type = PriceType::ReverseBest;
118        else if (xtp_price_type == XTP_PRICE_ALL_OR_CANCEL)
119            price_type = PriceType::Fok;
120    } else
121        price_type = PriceType::Unknown;
122}
123
124inline void to_xtp(XTP_PRICE_TYPE &xtp_price_type, const PriceType &price_type, const char *exchange) {
125    if (price_type == PriceType::Limit)
126        xtp_price_type = XTP_PRICE_LIMIT;
127    else if ((price_type == PriceType::Any) || (price_type == PriceType::FakBest5))
128        xtp_price_type = XTP_PRICE_BEST5_OR_CANCEL;
129    else if (strcmp(exchange, EXCHANGE_SSE) == 0) {
130        if (price_type == PriceType::ReverseBest)
131            xtp_price_type = XTP_PRICE_BEST5_OR_LIMIT;
132    } else if (strcmp(exchange, EXCHANGE_SZE) == 0) {
133        if (price_type == PriceType::Fak)
134            xtp_price_type = XTP_PRICE_BEST_OR_CANCEL;
135        else if (price_type == PriceType::ForwardBest)
136            xtp_price_type = XTP_PRICE_FORWARD_BEST;
137        else if (price_type == PriceType::ReverseBest)
138            xtp_price_type = XTP_PRICE_REVERSE_BEST_LIMIT;
139        else if (price_type == PriceType::Fok)
140            xtp_price_type = XTP_PRICE_ALL_OR_CANCEL;
141    } else
142        xtp_price_type = XTP_PRICE_TYPE_UNKNOWN;
143}
144
145inline void from_xtp(const XTP_ORDER_STATUS_TYPE &xtp_order_status, OrderStatus &status) {
146    if (xtp_order_status == XTP_ORDER_STATUS_INIT || xtp_order_status == XTP_ORDER_STATUS_NOTRADEQUEUEING) {
147        status = OrderStatus::Pending;
148    } else if (xtp_order_status == XTP_ORDER_STATUS_ALLTRADED) {
149        status = OrderStatus::Filled;
150    } else if (xtp_order_status == XTP_ORDER_STATUS_CANCELED) {
151        status = OrderStatus::Cancelled;
152    } else if (xtp_order_status == XTP_ORDER_STATUS_PARTTRADEDQUEUEING) {
153        status = OrderStatus::PartialFilledActive;
154    } else if (xtp_order_status == XTP_ORDER_STATUS_PARTTRADEDNOTQUEUEING) {
155        status = OrderStatus::PartialFilledNotActive;
156    } else if (xtp_order_status == XTP_ORDER_STATUS_REJECTED) {
157        status = OrderStatus::Error;
158    } else {
159        status = OrderStatus::Unknown;
160    }
161}
162
163inline void from_xtp(XTPQSI *ticker_info, Instrument &instrument) {
164    instrument.instrument_id = ticker_info->ticker;
165    if (ticker_info->exchange_id == 1) {
166        instrument.exchange_id = EXCHANGE_SSE;
167    } else if (ticker_info->exchange_id == 2) {
168        instrument.exchange_id = EXCHANGE_SZE;
169    } else {
170        instrument.exchange_id = "unknown";
171    }
172    memcpy(instrument.product_id, ticker_info->ticker_name, strlen(ticker_info->ticker_name));
173    instrument.instrument_type = get_instrument_type(instrument.exchange_id, instrument.instrument_id);
174    instrument.price_tick = ticker_info->price_tick;
175}
176
177inline void from_xtp(const XTP_SIDE_TYPE &xtp_side, Side &side) {
178    if (xtp_side == XTP_SIDE_BUY) {
179        side = Side::Buy;
180    } else if (xtp_side == XTP_SIDE_SELL) {
181        side = Side::Sell;
182    }
183}
184
185inline void to_xtp(XTP_SIDE_TYPE &xtp_side, const Side &side) {
186    if (side == Side::Buy) {
187        xtp_side = XTP_SIDE_BUY;
188    } else if (side == Side::Sell) {
189        xtp_side = XTP_SIDE_SELL;
190    }
191}
192
193inline void to_xtp(XTPMarketDataStruct &des, const Quote &ori) {
194    // TODO
195}
196
197inline void from_xtp(const XTPMarketDataStruct &ori, Quote &des) {
198    des.data_time = nsec_from_xtp_timestamp(ori.data_time);
199    des.instrument_id = ori.ticker;
200    from_xtp(ori.exchange_id, des.exchange_id);
201
202    des.instrument_type = ori.data_type != XTP_MARKETDATA_OPTION
203                            ? get_instrument_type(des.exchange_id, des.instrument_id)
204                            : InstrumentType::StockOption;
205
206    des.last_price = ori.last_price;
207    des.pre_settlement_price = ori.pre_settl_price;
208    des.pre_close_price = ori.pre_close_price;
209    des.open_price = ori.open_price;
210    des.high_price = ori.high_price;
211    des.low_price = ori.low_price;
212    des.volume = ori.qty;
213    des.turnover = ori.turnover;
214    des.close_price = ori.close_price;
215    des.settlement_price = ori.settl_price;
216    des.upper_limit_price = ori.upper_limit_price;
217    des.lower_limit_price = ori.lower_limit_price;
218    des.total_trade_num = ori.trades_count;
219
220    memcpy(des.ask_price, ori.ask, sizeof(des.ask_price));
221    memcpy(des.bid_price, ori.bid, sizeof(des.ask_price));
222    for (std::size_t i = 0; i < 10; i++) {
223        des.ask_volume[i] = ori.ask_qty[i];
224        des.bid_volume[i] = ori.bid_qty[i];
225    }
226}
227
228inline void to_xtp(XTPOrderInsertInfo &des, const OrderInput &ori) {
229    strcpy(des.ticker, ori.instrument_id);
230    to_xtp(des.market, ori.exchange_id);
231    des.price = ori.limit_price;
232    des.quantity = ori.volume;
233    to_xtp(des.side, ori.side);
234    to_xtp(des.price_type, ori.price_type, ori.exchange_id);
235    des.business_type = XTP_BUSINESS_TYPE_CASH;
236}
237
238inline void from_xtp(const XTPOrderInsertInfo &ori, OrderInput &des) {
239    // TODO
240}
241
242inline void from_xtp(const XTPOrderInfo &ori, Order &des) {
243    des.instrument_id = ori.ticker;
244    from_xtp(ori.market, des.exchange_id);
245    from_xtp(ori.price_type, ori.market, des.price_type);
246    des.volume = ori.quantity;
247    des.volume_left = ori.quantity - ori.qty_traded;
248    des.limit_price = ori.price;
249    from_xtp(ori.order_status, des.status);
250    from_xtp(ori.side, des.side);
251    set_offset(des);
252    des.instrument_type = get_instrument_type(des.exchange_id, des.instrument_id);
253    if (ori.update_time > 0) {
254        des.update_time = nsec_from_xtp_timestamp(ori.update_time);
255    }
256    std::string str_external_order_id = std::to_string(ori.order_xtp_id);
257    des.external_order_id = str_external_order_id.c_str();
258}
259
260inline void from_xtp(const XTPQueryOrderRsp &ori, HistoryOrder &des) {
261    des.instrument_id = ori.ticker;
262    from_xtp(ori.market, des.exchange_id);
263    from_xtp(ori.price_type, ori.market, des.price_type);
264    des.volume = ori.quantity;
265    des.volume_left = ori.qty_left;
266    des.limit_price = ori.price;
267    from_xtp(ori.order_status, des.status);
268    from_xtp(ori.side, des.side);
269    set_offset(des);
270    des.instrument_type = get_instrument_type(des.exchange_id, des.instrument_id);
271    if (ori.update_time > 0) {
272        des.update_time = nsec_from_xtp_timestamp(ori.update_time);
273    }
274    des.external_order_id, std::to_string(ori.order_xtp_id).c_str();
275}
276
277inline void from_xtp_no_price_type(const XTPOrderInfo &ori, Order &des) {
278    des.instrument_id = ori.ticker;
279    from_xtp(ori.market, des.exchange_id);
280    des.volume = ori.quantity;
281    des.volume_left = ori.quantity - ori.qty_traded;
282    des.limit_price = ori.price;
283    from_xtp(ori.order_status, des.status);
284    from_xtp(ori.side, des.side);
285    set_offset(des);
286    des.instrument_type = get_instrument_type(des.exchange_id, des.instrument_id);
287    if (ori.update_time > 0) {
288        des.update_time = nsec_from_xtp_timestamp(ori.update_time);
289    }
290    std::string str_external_order_id = std::to_string(ori.order_xtp_id);
291    des.external_order_id = str_external_order_id.c_str();
292}
293
294inline void from_xtp(const XTPTradeReport &ori, Trade &des) {
295    des.instrument_id = ori.ticker;
296    des.volume = ori.quantity;
297    des.price = ori.price;
298    from_xtp(ori.market, des.exchange_id);
299    des.instrument_type = get_instrument_type(des.exchange_id, des.instrument_id);
300    from_xtp(ori.side, des.side);
301    set_offset(des);
302    des.trade_time = yijinjing::time::now_in_nano();
303    des.external_order_id = std::to_string(ori.order_xtp_id).c_str();
304    des.external_trade_id = ori.exec_id;
305}
306
307inline void from_xtp(const XTPQueryTradeRsp &ori, HistoryTrade &des) {
308    des.instrument_id = ori.ticker;
309    des.volume = ori.quantity;
310    des.price = ori.price;
311    from_xtp(ori.market, des.exchange_id);
312    from_xtp(ori.side, des.side);
313    //  des.offset = Offset::Open;
314    set_offset(des);
315    des.instrument_type = get_instrument_type(des.exchange_id, des.instrument_id);
316    des.trade_time = nsec_from_xtp_timestamp(ori.trade_time);
317    des.external_order_id = std::to_string(ori.order_xtp_id).c_str();
318    des.external_trade_id = ori.exec_id;
319}
320
321inline void from_xtp(const XTPQueryStkPositionRsp &ori, Position &des) {
322    des.instrument_id = ori.ticker;
323    from_xtp(ori.market, des.exchange_id);
324    des.volume = ori.total_qty;
325    des.yesterday_volume = ori.sellable_qty;
326    des.avg_open_price = ori.avg_price;
327    des.position_cost_price = ori.avg_price;
328    des.static_yesterday = ori.yesterday_position;
329    //  des.open_volume // 数据不足以算出该字段, 保持为0
330    //  des.frozen_yesterday // 数据不足以算出该字段, 保持为0
331    //  des.frozen_total // 数据不足以算出该字段, 保持为0
332}
333
334inline void from_xtp(const XTPQueryAssetRsp &ori, Asset &des) { des.avail = ori.buying_power; }
335
336inline void from_xtp(const XTPTickByTickStruct &ori, Entrust &des) {
337    from_xtp(ori.exchange_id, des.exchange_id);
338    des.instrument_id = ori.ticker;
339    des.data_time = nsec_from_xtp_timestamp(ori.data_time);
340
341    des.price = ori.entrust.price;
342    des.volume = ori.entrust.qty;
343    des.main_seq = ori.entrust.channel_no;
344    des.seq = ori.entrust.seq;
345
346    if (ori.entrust.ord_type == '1') {
347        des.price_type = PriceType::Any;
348    } else if (ori.entrust.ord_type == '2') {
349        des.price_type = PriceType::Limit;
350    } else if (ori.entrust.ord_type == 'U') {
351        des.price_type = PriceType::ForwardBest;
352    }
353
354    // xtp(深交所的order_no在xtp接口注释标注为无意义,偶尔为0 seq对应的是真正的订单号 上交所的order_no是订单号)
355    if (strcmp(des.exchange_id, "SSE")) {
356        des.orig_order_no = ori.entrust.order_no;
357    } else {
358        des.orig_order_no = ori.entrust.seq;
359    }
360
361    switch (ori.entrust.side) {
362    case 'B': {
363        des.side = Side::Buy;
364        break;
365    }
366    case 'S': {
367        des.side = Side::Sell;
368        break;
369    }
370    case '1': {
371        des.side = Side::Buy;
372        break;
373    }
374    case '2': {
375        des.side = Side::Sell;
376        break;
377    }
378    default: {
379        des.side = Side::Unknown;
380        break;
381    }
382    }
383}
384
385inline void from_xtp(const XTPTickByTickStruct &ori, Transaction &des) {
386    from_xtp(ori.exchange_id, des.exchange_id);
387    des.instrument_id = ori.ticker;
388    des.data_time = nsec_from_xtp_timestamp(ori.data_time);
389
390    if (ori.type == XTP_TBT_ENTRUST) {
391        des.instrument_id = ori.ticker;
392        des.data_time = nsec_from_xtp_timestamp(ori.data_time);
393
394        des.main_seq = ori.entrust.channel_no;
395        des.seq = ori.entrust.seq;
396
397        des.price = ori.entrust.price;
398        des.volume = ori.entrust.qty;
399
400        if (ori.entrust.side == 'B') {
401            des.side = Side::Buy;
402            des.bid_no = ori.entrust.order_no;
403        } else {
404            des.side = Side::Sell;
405            des.ask_no = ori.entrust.order_no;
406        }
407        des.exec_type = ExecType::Cancel;
408
409    } else {
410
411        des.main_seq = ori.trade.channel_no;
412        des.seq = ori.trade.seq;
413
414        des.price = ori.trade.price;
415        des.volume = ori.trade.qty;
416
417        des.bid_no = ori.trade.bid_no;
418        des.ask_no = ori.trade.ask_no;
419
420        switch (ori.trade.trade_flag) {
421        case 'B': {
422            des.side = Side::Buy;
423            des.exec_type = ExecType::Trade;
424            break;
425        }
426        case 'S': {
427            des.side = Side::Sell;
428            des.exec_type = ExecType::Trade;
429            break;
430        }
431        case 'N': {
432            des.side = Side::Unknown;
433            des.exec_type = ExecType::Trade;
434            break;
435        }
436        case '4': {
437            des.side = (des.bid_no < des.ask_no) ? Side::Sell : Side::Buy;
438            des.exec_type = ExecType::Cancel;
439            break;
440        }
441        case 'F': {
442            des.side = (des.bid_no < des.ask_no) ? Side::Sell : Side::Buy;
443            des.exec_type = ExecType::Trade;
444            break;
445        }
446        default: {
447            break;
448        }
449        }
450    }
451}
452} // namespace kungfu::wingchun::xtp
453#endif // KUNGFU_XTP_EXT_TYPE_CONVERT_H

buffer_data.h文件

封装xtp回调函数参数, 方便落地原始数据到journal

 1#ifndef XTP_BUFFER_DATA_H
 2#define XTP_BUFFER_DATA_H
 3
 4#include "serialize_xtp.h"
 5
 6static constexpr int32_t kXTPOrderInfoType = 12340001;
 7static constexpr int32_t kXTPTradeReportType = 12340002;
 8static constexpr int32_t kQueryXTPOrderInfoType = 12340003;
 9static constexpr int32_t kQueryXTPTradeReportType = 12340004;
10static constexpr int32_t kCancelOrderErrorType = 12340005;
11
12static constexpr int32_t kQueryAssetType = 12340011;
13static constexpr int32_t kQueryPositionType = 12340012;
14
15struct BufferXTPTradeReport {
16    XTPQueryTradeRsp trade_info;
17    XTPRI error_info;
18    int request_id;
19    bool is_last;
20    uint64_t session_id;
21};
22
23struct BufferXTPOrderInfo {
24    XTPOrderInfo order_info;
25    XTPRI error_info;
26    int request_id;
27    bool is_last;
28    uint64_t session_id;
29};
30
31struct BufferXTPOrderCancelInfo {
32    XTPOrderCancelInfo cancel_info;
33    XTPRI error_info;
34    uint64_t session_id;
35};
36
37struct BufferXTPQueryAssetRsp {
38    XTPQueryAssetRsp asset;
39    XTPRI error_info;
40    int request_id;
41    bool is_last;
42    uint64_t session_id;
43};
44
45struct BufferXTPQueryStkPositionRsp {
46    XTPQueryStkPositionRsp position;
47    XTPRI error_info;
48    int request_id;
49    bool is_last;
50    uint64_t session_id;
51};
52
53namespace nlohmann {
54NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(BufferXTPTradeReport, trade_info, session_id, error_info, request_id, is_last);
55NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(BufferXTPOrderInfo, order_info, session_id, error_info, request_id, is_last);
56NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(BufferXTPOrderCancelInfo, cancel_info, error_info, session_id);
57NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(BufferXTPQueryAssetRsp, asset, error_info, session_id, request_id, is_last);
58NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE(BufferXTPQueryStkPositionRsp, position, error_info, session_id, request_id, is_last);
59
60} // namespace nlohmann
61
62#endif // XTP_BUFFER_DATA_H

trader_xtp.h文件

xtp的交易柜台头文件定义

  1#ifndef KUNGFU_XTP_EXT_TRADER_H
  2#define KUNGFU_XTP_EXT_TRADER_H
  3
  4#include <kungfu/wingchun/broker/trader.h>
  5#include <xtp_trader_api.h>
  6
  7namespace kungfu::wingchun::xtp {
  8using namespace kungfu::longfist;
  9using namespace kungfu::longfist::types;
 10
 11struct TDConfiguration {
 12int client_id;
 13std::string account_id;
 14std::string password;
 15std::string software_key;
 16std::string td_ip;
 17int td_port;
 18bool sync_external_order;
 19bool recover_order_trade;
 20};
 21
 22inline void from_json(const nlohmann::json &j, TDConfiguration &c) {
 23j.at("client_id").get_to(c.client_id);
 24j.at("account_id").get_to(c.account_id);
 25j.at("password").get_to(c.password);
 26j.at("software_key").get_to(c.software_key);
 27j.at("td_ip").get_to(c.td_ip);
 28j.at("td_port").get_to(c.td_port);
 29c.sync_external_order = j.value<bool>("sync_external_order", false);
 30c.recover_order_trade = j.value<bool>("recover_order_trade", true);
 31}
 32
 33class TraderXTP : public XTP::API::TraderSpi, public broker::Trader {
 34public:
 35explicit TraderXTP(broker::BrokerVendor &vendor);
 36
 37~TraderXTP() override;
 38
 39[[nodiscard]] longfist::enums::AccountType get_account_type() const override {
 40    return longfist::enums::AccountType::Stock;
 41}
 42
 43void pre_start() override;
 44
 45void on_start() override;
 46
 47void on_exit() override;
 48
 49bool insert_order(const event_ptr &event) override;
 50
 51bool cancel_order(const event_ptr &event) override;
 52
 53bool req_position() override;
 54
 55bool on_custom_event(const event_ptr &event) override;
 56
 57bool req_account() override;
 58
 59bool req_history_order(const event_ptr &event) override;
 60
 61bool req_history_trade(const event_ptr &event) override;
 62
 63void on_recover() override;
 64
 65/// 当客户端的某个连接与交易后台通信连接断开时,该方法被调用。
 66///@param reason 错误原因,请与错误代码表对应
 67///@param session_id 资金账户对应的session_id,登录时得到
 68///@remark
 69/// 用户主动调用logout导致的断线,不会触发此函数。api不会自动重连,当断线发生时,请用户自行选择后续操作,可以在此函数中调用Login重新登录,并更新session_id,此时用户收到的数据跟断线之前是连续的
 70void OnDisconnected(uint64_t session_id, int reason) override;
 71
 72/// 错误应答
 73///@param error_info
 74/// 当服务器响应发生错误时的具体的错误代码和错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
 75///@remark 此函数只有在服务器发生错误时才会调用,一般无需用户处理
 76void OnError(XTPRI *error_info) override{};
 77
 78/// 报单通知
 79///@param order_info 订单响应具体信息,用户可以通过order_info.order_xtp_id来管理订单,通过GetClientIDByXTPID() ==
 80///  client_id来过滤自己的订单,order_info.qty_left字段在订单为未成交、部成、全成、废单状态时,表示此订单还没有成交的数量,在部撤、全撤状态时,表示此订单被撤的数量。order_info.order_cancel_xtp_id为其所对应的撤单ID,不为0时表示此单被撤成功
 81///@param error_info
 82/// 订单被拒绝或者发生错误时错误代码和错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
 83///@remark
 84/// 每次订单状态更新时,都会被调用,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线,在订单未成交、全部成交、全部撤单、部分撤单、已拒绝这些状态时会有响应,对于部分成交的情况,请由订单的成交回报来自行确认。所有登录了此用户的客户端都将收到此用户的订单响应
 85void OnOrderEvent(XTPOrderInfo *order_info, XTPRI *error_info, uint64_t session_id) override;
 86
 87/// 成交通知
 88///@param trade_info 成交回报的具体信息,用户可以通过trade_info.order_xtp_id来管理订单,通过GetClientIDByXTPID() ==
 89///  client_id来过滤自己的订单。对于上交所,exec_id可以唯一标识一笔成交。当发现2笔成交回报拥有相同的exec_id,则可以认为此笔交易自成交了。对于深交所,exec_id是唯一的,暂时无此判断机制。report_index+market字段可以组成唯一标识表示成交回报。
 90///@remark
 91/// 订单有成交发生的时候,会被调用,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线。所有登录了此用户的客户端都将收到此用户的成交回报。相关订单为部成状态,需要用户通过成交回报的成交数量来确定,OnOrderEvent()不会推送部成状态。
 92void OnTradeEvent(XTPTradeReport *trade_info, uint64_t session_id) override;
 93
 94/// 撤单出错响应
 95///@param cancel_info 撤单具体信息,包括撤单的order_cancel_xtp_id和待撤单的order_xtp_id
 96///@param error_info
 97/// 撤单被拒绝或者发生错误时错误代码和错误信息,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线,当error_info为空,或者error_info.error_id为0时,表明没有错误
 98///@remark 此响应只会在撤单发生错误时被回调
 99void OnCancelOrderError(XTPOrderCancelInfo *cancel_info, XTPRI *error_info, uint64_t session_id) override;
100
101/// 请求查询报单响应
102///@param order_info 查询到的一个报单
103///@param error_info
104/// 查询报单时发生错误时,返回的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
105///@param request_id 此消息响应函数对应的请求ID
106///@param is_last
107/// 此消息响应函数是否为request_id这条请求所对应的最后一个响应,当为最后一个的时候为true,如果为false,表示还有其他后续消息响应
108///@remark
109/// 由于支持分时段查询,一个查询请求可能对应多个响应,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
110void OnQueryOrder(XTPQueryOrderRsp *order_info, XTPRI *error_info, int request_id, bool is_last,
111                    uint64_t session_id) override;
112
113/// 请求查询成交响应
114///@param trade_info 查询到的一个成交回报
115///@param error_info
116/// 查询成交回报发生错误时返回的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
117///@param request_id 此消息响应函数对应的请求ID
118///@param is_last
119/// 此消息响应函数是否为request_id这条请求所对应的最后一个响应,当为最后一个的时候为true,如果为false,表示还有其他后续消息响应
120///@remark
121/// 由于支持分时段查询,一个查询请求可能对应多个响应,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
122void OnQueryTrade(XTPQueryTradeRsp *trade_info, XTPRI *error_info, int request_id, bool is_last,
123                    uint64_t session_id) override;
124
125/// 请求查询投资者持仓响应
126///@param position 查询到的一只股票的持仓情况
127///@param error_info
128/// 查询账户持仓发生错误时返回的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
129///@param request_id 此消息响应函数对应的请求ID
130///@param is_last
131/// 此消息响应函数是否为request_id这条请求所对应的最后一个响应,当为最后一个的时候为true,如果为false,表示还有其他后续消息响应
132///@remark
133/// 由于用户可能持有多个股票,一个查询请求可能对应多个响应,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
134void OnQueryPosition(XTPQueryStkPositionRsp *position, XTPRI *error_info, int request_id, bool is_last,
135                    uint64_t session_id) override;
136
137/// 请求查询资金账户响应,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
138///@param asset 查询到的资金账户情况
139///@param error_info
140/// 查询资金账户发生错误时返回的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
141///@param request_id 此消息响应函数对应的请求ID
142///@param is_last
143/// 此消息响应函数是否为request_id这条请求所对应的最后一个响应,当为最后一个的时候为true,如果为false,表示还有其他后续消息响应
144///@remark 需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
145void OnQueryAsset(XTPQueryAssetRsp *asset, XTPRI *error_info, int request_id, bool is_last,
146                    uint64_t session_id) override;
147
148/// 请求查询分级基金信息响应,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
149///@param fund_info 查询到的分级基金情况
150///@param error_info
151/// 查询分级基金发生错误时返回的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
152///@param request_id 此消息响应函数对应的请求ID
153///@param is_last
154/// 此消息响应函数是否为request_id这条请求所对应的最后一个响应,当为最后一个的时候为true,如果为false,表示还有其他后续消息响应
155///@remark 需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
156void OnQueryStructuredFund(XTPStructuredFundInfo *fund_info, XTPRI *error_info, int request_id, bool is_last,
157                            uint64_t session_id) override{};
158
159/// 请求查询资金划拨订单响应,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
160///@param fund_transfer_info 查询到的资金账户情况
161///@param error_info
162/// 查询资金账户发生错误时返回的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
163///@param request_id 此消息响应函数对应的请求ID
164///@param is_last
165/// 此消息响应函数是否为request_id这条请求所对应的最后一个响应,当为最后一个的时候为true,如果为false,表示还有其他后续消息响应
166///@remark 需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
167void OnQueryFundTransfer(XTPFundTransferNotice *fund_transfer_info, XTPRI *error_info, int request_id, bool is_last,
168                        uint64_t session_id) override{};
169
170/// 资金划拨通知
171///@param fund_transfer_info
172/// 资金划拨通知的具体信息,用户可以通过fund_transfer_info.serial_id来管理订单,通过GetClientIDByXTPID() ==
173///  client_id来过滤自己的订单。
174///@param error_info
175/// 资金划拨订单被拒绝或者发生错误时错误代码和错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
176///@remark
177/// 当资金划拨订单有状态变化的时候,会被调用,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线。所有登录了此用户的客户端都将收到此用户的资金划拨通知。
178void OnFundTransfer(XTPFundTransferNotice *fund_transfer_info, XTPRI *error_info, uint64_t session_id) override{};
179
180/// 请求查询ETF清单文件的响应,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
181///@param etf_info 查询到的ETF清单文件情况
182///@param error_info
183/// 查询ETF清单文件发生错误时返回的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
184///@param request_id 此消息响应函数对应的请求ID
185///@param is_last
186/// 此消息响应函数是否为request_id这条请求所对应的最后一个响应,当为最后一个的时候为true,如果为false,表示还有其他后续消息响应
187///@remark 需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
188void OnQueryETF(XTPQueryETFBaseRsp *etf_info, XTPRI *error_info, int request_id, bool is_last,
189                uint64_t session_id) override{};
190
191/// 请求查询ETF股票篮的响应,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
192///@param etf_component_info 查询到的ETF合约的相关成分股信息
193///@param error_info
194/// 查询ETF股票篮发生错误时返回的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
195///@param request_id 此消息响应函数对应的请求ID
196///@param is_last
197/// 此消息响应函数是否为request_id这条请求所对应的最后一个响应,当为最后一个的时候为true,如果为false,表示还有其他后续消息响应
198///@remark 需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
199void OnQueryETFBasket(XTPQueryETFComponentRsp *etf_component_info, XTPRI *error_info, int request_id, bool is_last,
200                        uint64_t session_id) override{};
201
202/// 请求查询今日新股申购信息列表的响应,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
203///@param ipo_info 查询到的今日新股申购的一只股票信息
204///@param error_info
205/// 查询今日新股申购信息列表发生错误时返回的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
206///@param request_id 此消息响应函数对应的请求ID
207///@param is_last
208/// 此消息响应函数是否为request_id这条请求所对应的最后一个响应,当为最后一个的时候为true,如果为false,表示还有其他后续消息响应
209///@remark 需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
210void OnQueryIPOInfoList(XTPQueryIPOTickerRsp *ipo_info, XTPRI *error_info, int request_id, bool is_last,
211                        uint64_t session_id) override{};
212
213/// 请求查询用户新股申购额度信息的响应,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
214///@param quota_info 查询到的用户某个市场的今日新股申购额度信息
215///@param error_info
216/// 查查询用户新股申购额度信息发生错误时返回的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
217///@param request_id 此消息响应函数对应的请求ID
218///@param is_last
219/// 此消息响应函数是否为request_id这条请求所对应的最后一个响应,当为最后一个的时候为true,如果为false,表示还有其他后续消息响应
220///@remark 需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
221void OnQueryIPOQuotaInfo(XTPQueryIPOQuotaRsp *quota_info, XTPRI *error_info, int request_id, bool is_last,
222                        uint64_t session_id) override{};
223
224private:
225TDConfiguration config_{};
226XTP::API::TraderApi *api_{};
227uint64_t session_id_{};
228int request_id_{};
229int get_request_id() { return ++request_id_; }
230std::string trading_day_{};
231
232std::unordered_map<uint64_t, uint64_t> map_kf_to_xtp_order_id_{};
233std::unordered_map<uint64_t, uint64_t> map_xtp_to_kf_order_id_{};
234std::unordered_map<uint64_t, uint64_t> map_request_location_{};
235std::unordered_map<uint64_t, std::unordered_set<std::string>> map_xtp_order_id_to_xtp_trader_ids_{};
236std::unordered_map<uint64_t, std::vector<XTPTradeReport>> map_xtp_order_id_to_XTPTradeReports_{};
237std::unordered_map<uint64_t, int64_t> map_xtp_order_id_to_traded_volume_{};
238std::unordered_map<uint64_t, std::queue<uint64_t>> map_xtp_order_id_to_action_ids_{};
239
240yijinjing::journal::writer_ptr get_history_writer(uint64_t request_id);
241
242bool custom_OnCancelOrderError(const event_ptr &event);
243bool custom_OnOrderEvent(const event_ptr &event);
244bool custom_OnTradeEvent(const event_ptr &event);
245bool custom_OnQueryOrder(const event_ptr &event);
246bool custom_OnQueryTrade(const event_ptr &event);
247bool custom_OnQueryAsset(const event_ptr &event);
248bool custom_OnQueryPosition(const event_ptr &event);
249
250bool custom_OnCancelOrderError(const XTPOrderCancelInfo &cancel_info, const XTPRI &error_info, uint64_t session_id);
251bool custom_OnOrderEvent(const XTPOrderInfo &order_info, const XTPRI &error_info, uint64_t session_id);
252bool custom_OnTradeEvent(const XTPTradeReport &trade_info, uint64_t session_id);
253bool custom_OnQueryOrder(const XTPOrderInfo &order_info, const XTPRI &error_info, int request_id, bool is_last,
254                        uint64_t session_id);
255bool custom_OnQueryTrade(const XTPTradeReport &trade_info, const XTPRI &error_info, int request_id, bool is_last,
256                        uint64_t session_id);
257bool custom_OnQueryAsset(const XTPQueryAssetRsp &asset, const XTPRI &error_info, int request_id, bool is_last,
258                        uint64_t session_id);
259bool custom_OnQueryPosition(const XTPQueryStkPositionRsp &position, const XTPRI &error_info, int request_id,
260                            bool is_last, uint64_t session_id);
261
262void try_deal_XTPTradeReport(uint64_t xtp_order_id);
263bool generate_external_order(const XTPOrderInfo &order_info);
264
265void add_XTPTradeReport(const XTPTradeReport &trade_info);
266bool has_dealt_trade(uint64_t xtp_order_id, const std::string &exec_id);
267void add_dealt_trade(uint64_t xtp_order_id, const std::string &exec_id);
268
269void add_traded_volume(uint64_t order_xtp_id, int64_t trade_volume);
270int64_t get_traded_volume(uint64_t order_xtp_id);
271
272void add_action_id(uint64_t xtp_order_id, int64_t action_id);
273uint64_t get_action_id(uint64_t xtp_order_id);
274
275void req_order_trade();
276void try_ready();
277bool req_order_over_{false};
278bool req_trade_over_{false};
279};
280} // namespace kungfu::wingchun::xtp
281#endif // KUNGFU_XTP_EXT_TRADER_H

trader_xtp.cpp文件

xtp的交易柜台函数实现

  1#include "trader_xtp.h"
  2#include "buffer_data.h"
  3#include "serialize_xtp.h"
  4#include "type_convert.h"
  5#include <algorithm>
  6
  7namespace kungfu::wingchun::xtp {
  8using namespace kungfu::yijinjing::data;
  9using namespace kungfu::yijinjing;
 10
 11TraderXTP::TraderXTP(broker::BrokerVendor &vendor) : Trader(vendor) {
 12    KUNGFU_SETUP_LOG();
 13    SPDLOG_DEBUG("arguments: {}", get_vendor().get_arguments());
 14}
 15
 16TraderXTP::~TraderXTP() {
 17    if (api_ != nullptr) {
 18        api_->Release();
 19    }
 20}
 21
 22void TraderXTP::pre_start() {
 23    config_ = nlohmann::json::parse(get_config());
 24    SPDLOG_INFO("config: {}", get_config());
 25    if (not config_.recover_order_trade) {
 26        disable_recover();
 27    }
 28}
 29
 30void TraderXTP::on_start() {
 31    if (config_.client_id < 1 or config_.client_id > 99) {
 32        SPDLOG_ERROR("client_id must between 1 and 99");
 33    }
 34    std::string runtime_folder = get_runtime_folder();
 35    SPDLOG_INFO("Connecting XTP account {} with tcp://{}:{}", config_.account_id, config_.td_ip, config_.td_port);
 36    api_ = XTP::API::TraderApi::CreateTraderApi(config_.client_id, runtime_folder.c_str());
 37    api_->RegisterSpi(this);
 38    api_->SubscribePublicTopic(XTP_TERT_QUICK);
 39    api_->SetSoftwareVersion("1.1.0");
 40    api_->SetSoftwareKey(config_.software_key.c_str());
 41    session_id_ = api_->Login(config_.td_ip.c_str(), config_.td_port, config_.account_id.c_str(),
 42                            config_.password.c_str(), XTP_PROTOCOL_TCP);
 43    if (session_id_ > 0) {
 44        SPDLOG_INFO("Login successfully");
 45        req_order_trade();
 46    } else {
 47        update_broker_state(BrokerState::LoginFailed);
 48        SPDLOG_ERROR("Login failed [{}]: {}", api_->GetApiLastError()->error_id, api_->GetApiLastError()->error_msg);
 49    }
 50}
 51
 52void TraderXTP::on_exit() {
 53    if (api_ != nullptr and session_id_ > 0) {
 54        auto result = api_->Logout(session_id_);
 55        SPDLOG_INFO("Logout with return code {}", result);
 56    }
 57}
 58
 59bool TraderXTP::insert_order(const event_ptr &event) {
 60    const OrderInput &input = event->data<OrderInput>();
 61    SPDLOG_DEBUG("OrderInput: {}", input.to_string());
 62    XTPOrderInsertInfo xtp_input = {};
 63    to_xtp(xtp_input, input);
 64
 65    SPDLOG_DEBUG("XTPOrderInsertInfo: {}", to_string(xtp_input));
 66    uint64_t order_xtp_id = api_->InsertOrder(&xtp_input, session_id_);
 67    auto success = order_xtp_id != 0;
 68
 69    auto nano = yijinjing::time::now_in_nano();
 70    auto writer = get_writer(event->source());
 71    Order &order = writer->open_data<Order>(event->gen_time());
 72    order_from_input(input, order);
 73    order.external_order_id = std::to_string(order_xtp_id).c_str();
 74    order.insert_time = nano;
 75    order.update_time = nano;
 76
 77    if (success) {
 78        map_kf_to_xtp_order_id_.emplace(uint64_t(input.order_id), order_xtp_id);
 79        map_xtp_to_kf_order_id_.emplace(order_xtp_id, uint64_t(input.order_id));
 80    } else {
 81        auto error_info = api_->GetApiLastError();
 82        order.error_id = error_info->error_id;
 83        order.error_msg = error_info->error_msg;
 84        order.status = OrderStatus::Error;
 85    }
 86
 87    SPDLOG_DEBUG("Order: {}", order.to_string());
 88    writer->close_data();
 89    if (not success) {
 90        SPDLOG_ERROR("fail to insert order {}, error id {}, {}", to_string(xtp_input), (int)order.error_id,
 91                    order.error_msg);
 92    }
 93    return success;
 94}
 95
 96bool TraderXTP::cancel_order(const event_ptr &event) {
 97    const OrderAction &action = event->data<OrderAction>();
 98    SPDLOG_DEBUG("OrderAction: {}", action.to_string());
 99    auto order_id_iter = map_kf_to_xtp_order_id_.find(action.order_id);
100    if (order_id_iter == map_kf_to_xtp_order_id_.end()) {
101        SPDLOG_ERROR("failed to cancel order {}, can't find related xtp order id", action.order_id);
102        return false;
103    }
104
105    if (not has_order(action.order_id)) {
106        SPDLOG_ERROR("no order_id {} in orders_", action.order_id);
107        return false;
108    }
109
110    auto &order_state = get_order(action.order_id);
111    uint64_t order_xtp_id = order_id_iter->second;
112    add_action_id(order_xtp_id, action.order_action_id);
113    auto xtp_action_id = api_->CancelOrder(order_xtp_id, session_id_);
114    auto success = xtp_action_id != 0;
115
116    if (not success) {
117        XTPRI *error_info = api_->GetApiLastError();
118        SPDLOG_ERROR("failed to cancel order {}, order_xtp_id: {} session_id: {} error_id: {} error_msg: {}",
119                    action.order_id, order_xtp_id, session_id_, error_info->error_id, error_info->error_msg);
120        OrderActionError &error = get_writer(event->source())->open_data<OrderActionError>(now());
121        error.order_id = action.order_id; // 订单ID
122        std::string str_external_order_id = std::to_string(order_xtp_id);
123        error.external_order_id = str_external_order_id.c_str();
124        error.order_action_id = action.order_action_id; // 订单操作ID,
125        error.error_id = xtp_action_id;                 // 错误ID
126        error.error_msg = error_info->error_msg;        // 错误信息
127        error.insert_time = time::now_in_nano();        // 写入时间
128        SPDLOG_DEBUG("OrderActionError: {}", error.to_string());
129        get_writer(event->source())->close_data();
130        return false;
131    }
132
133    if (not is_final_status(order_state.data.status) or order_state.data.status == OrderStatus::Lost) {
134        order_state.data.status = OrderStatus::Cancelling;
135        try_write_to(order_state.data, order_state.dest);
136    }
137    SPDLOG_DEBUG("Order: {}", order_state.data.to_string());
138    return success;
139}
140
141bool TraderXTP::req_position() {
142    SPDLOG_INFO("req_position");
143    return api_->QueryPosition(nullptr, session_id_, get_request_id()) == 0;
144}
145
146bool TraderXTP::req_account() {
147    SPDLOG_INFO("req_account");
148    return api_->QueryAsset(session_id_, get_request_id()) == 0;
149}
150
151void TraderXTP::OnDisconnected(uint64_t session_id, int reason) {
152    if (session_id == session_id_) {
153        update_broker_state(BrokerState::DisConnected);
154        SPDLOG_ERROR("disconnected, reason: {}", reason);
155    }
156}
157
158void TraderXTP::OnOrderEvent(XTPOrderInfo *order_info, XTPRI *error_info, uint64_t session_id) {
159    if (nullptr == order_info) {
160        SPDLOG_ERROR("XTPOrderInfo is nullptr");
161        return;
162    }
163    SPDLOG_DEBUG("XTPOrderInfo: {}", to_string(*order_info));
164    auto &bf_order_info = get_thread_writer()->open_custom_data<BufferXTPOrderInfo>(kXTPOrderInfoType, now());
165    memcpy(&bf_order_info.order_info, order_info, sizeof(XTPOrderInfo));
166    bf_order_info.session_id = session_id;
167    if (error_info != nullptr) {
168        memcpy(&bf_order_info.error_info, error_info, sizeof(XTPRI));
169    } else {
170        memset(&bf_order_info.error_info, 0, sizeof(XTPRI));
171    }
172    SPDLOG_DEBUG("BufferXTPOrderInfo: {}", to_string(bf_order_info));
173    get_thread_writer()->close_data();
174}
175
176bool TraderXTP::custom_OnOrderEvent(const event_ptr &event) {
177    const auto *bf_order_info = reinterpret_cast<const BufferXTPOrderInfo *>(event->data_address());
178    return custom_OnOrderEvent(bf_order_info->order_info, bf_order_info->error_info, bf_order_info->session_id);
179}
180
181bool TraderXTP::custom_OnOrderEvent(const XTPOrderInfo &order_info, const XTPRI &error_info, uint64_t session_id) {
182    SPDLOG_DEBUG("XTPOrderInfo: {}", to_string(order_info));
183    SPDLOG_DEBUG("session_id: {}, XTPRI: {}", session_id, to_string(error_info));
184
185    auto order_xtp_id_iter = map_xtp_to_kf_order_id_.find(order_info.order_xtp_id);
186    if (order_xtp_id_iter == map_xtp_to_kf_order_id_.end()) {
187        SPDLOG_WARN("unrecognized order_xtp_id {}@{}", order_info.order_xtp_id, trading_day_);
188        return generate_external_order(order_info);
189    }
190
191    uint64_t kf_order_id = order_xtp_id_iter->second;
192    if (not has_order(kf_order_id)) {
193        return generate_external_order(order_info);
194    }
195
196    auto &order_state = get_order(kf_order_id);
197    if (not is_final_status(order_state.data.status) or order_state.data.status == OrderStatus::Lost) {
198        from_xtp_no_price_type(order_info, order_state.data);
199        order_state.data.update_time = yijinjing::time::now_in_nano();
200        if (error_info.error_id != 0) {
201            order_state.data.error_id = error_info.error_id;
202            order_state.data.error_msg = error_info.error_msg;
203        }
204        try_write_to(order_state.data, order_state.dest);
205        SPDLOG_DEBUG("Order: {}", order_state.data.to_string());
206        try_deal_XTPTradeReport(order_info.order_xtp_id);
207    }
208    return true;
209}
210
211bool TraderXTP::generate_external_order(const XTPOrderInfo &order_info) {
212    SPDLOG_DEBUG("XTPOrderInfo: {}", to_string(order_info));
213    static const std::unordered_set<int> set_cancel_enum = {
214        XTP_ORDER_SUBMIT_STATUS_TYPE::XTP_ORDER_SUBMIT_STATUS_CANCEL_SUBMITTED, //
215        XTP_ORDER_SUBMIT_STATUS_TYPE::XTP_ORDER_SUBMIT_STATUS_CANCEL_REJECTED,  //
216        XTP_ORDER_SUBMIT_STATUS_TYPE::XTP_ORDER_SUBMIT_STATUS_CANCEL_ACCEPTED   //
217    };
218
219    if (not config_.sync_external_order) {
220        return false;
221    }
222
223    if (set_cancel_enum.find(order_info.order_submit_status) != set_cancel_enum.end()) {
224        SPDLOG_DEBUG("this XTPOrderInfo is xtp cancel order, do not generate kungfu Order");
225        return false;
226    }
227
228    auto writer = get_public_writer();
229    auto nano = yijinjing::time::now_in_nano();
230    Order &order = writer->open_data<Order>(now());
231    order.order_id = writer->current_frame_uid();
232    from_xtp(order_info, order);
233    order.insert_time = nsec_from_xtp_timestamp(order_info.insert_time);
234    order.update_time = nano;
235    map_kf_to_xtp_order_id_.emplace(uint64_t(order.order_id), order_info.order_xtp_id);
236    map_xtp_to_kf_order_id_.emplace(order_info.order_xtp_id, uint64_t(order.order_id));
237    SPDLOG_DEBUG("Order: {}", order.to_string());
238    writer->close_data();
239    try_deal_XTPTradeReport(order_info.order_xtp_id);
240    return true;
241}
242
243void TraderXTP::OnTradeEvent(XTPTradeReport *trade_info, uint64_t session_id) {
244    if (nullptr == trade_info) {
245        SPDLOG_ERROR("XTPTradeReport is nullptr");
246        return;
247    }
248    SPDLOG_DEBUG("XTPTradeReport: {}", to_string(*trade_info));
249
250    auto &bf_trade_info = get_thread_writer()->open_custom_data<BufferXTPTradeReport>(kXTPTradeReportType, now());
251    memcpy(&bf_trade_info.trade_info, trade_info, sizeof(XTPTradeReport));
252    bf_trade_info.session_id = session_id;
253    SPDLOG_DEBUG("BufferXTPOrderInfo: {}", to_string(bf_trade_info));
254    get_thread_writer()->close_data();
255}
256
257bool TraderXTP::custom_OnTradeEvent(const XTPTradeReport &trade_info, uint64_t session_id) {
258    SPDLOG_DEBUG("XTPTradeReport: {}", to_string(trade_info));
259    SPDLOG_DEBUG("session_id: {}", session_id);
260
261    auto order_xtp_id_iter = map_xtp_to_kf_order_id_.find(trade_info.order_xtp_id);
262    if (order_xtp_id_iter == map_xtp_to_kf_order_id_.end()) {
263        SPDLOG_WARN("unrecognized order_xtp_id {}, store in map_xtp_order_id_to_XTPTradeReports_",
264                    trade_info.order_xtp_id);
265        add_XTPTradeReport(trade_info);
266        return false;
267    }
268
269    if (has_dealt_trade(trade_info.order_xtp_id, trade_info.exec_id)) {
270        SPDLOG_DEBUG("order_xtp_id:{}, exec_id: {}, has dealt", trade_info.order_xtp_id, trade_info.exec_id);
271        return false;
272    }
273
274    uint64_t kf_order_id = order_xtp_id_iter->second;
275    if (not has_order(kf_order_id)) {
276        SPDLOG_ERROR("no order_id {} in orders_", kf_order_id);
277        return false;
278    }
279
280    add_dealt_trade(trade_info.order_xtp_id, trade_info.exec_id);
281    auto &order_state = get_order(kf_order_id);
282
283    if (has_writer(order_state.dest)) {
284        auto writer = get_writer(order_state.dest);
285        Trade &trade = writer->open_data<Trade>(now());
286        from_xtp(trade_info, trade);
287        trade.trade_id = writer->current_frame_uid();
288        trade.order_id = kf_order_id;
289        add_traded_volume(trade_info.order_xtp_id, trade.volume);
290        SPDLOG_DEBUG("Trade: {}", trade.to_string());
291        writer->close_data();
292    } else {
293        Trade trade{};
294        from_xtp(trade_info, trade);
295        trade.trade_id = get_public_writer()->current_frame_uid() xor (time::now_in_nano() & 0xFFFFFFFF);
296        trade.order_id = kf_order_id;
297        add_traded_volume(trade_info.order_xtp_id, trade.volume);
298        SPDLOG_DEBUG("Trade: {}", trade.to_string());
299        try_write_to(trade, order_state.dest);
300    }
301
302    if (not is_final_status(order_state.data.status) or order_state.data.status == OrderStatus::Lost) {
303        order_state.data.volume_left = std::min<int64_t>(
304            order_state.data.volume_left, order_state.data.volume - get_traded_volume(trade_info.order_xtp_id));
305        if (order_state.data.volume_left > 0) {
306            order_state.data.status = OrderStatus::PartialFilledActive;
307        }
308        order_state.data.update_time = now();
309        SPDLOG_DEBUG("Order: {}", order_state.data.to_string());
310        try_write_to(order_state.data, order_state.dest);
311    }
312    return true;
313}
314
315bool TraderXTP::custom_OnTradeEvent(const event_ptr &event) {
316    const auto *bf_trade_info = reinterpret_cast<const BufferXTPTradeReport *>(event->data_address());
317    return custom_OnTradeEvent(bf_trade_info->trade_info, bf_trade_info->session_id);
318}
319
320void TraderXTP::OnCancelOrderError(XTPOrderCancelInfo *cancel_info, XTPRI *error_info, uint64_t session_id) {
321    if (nullptr == cancel_info) {
322        SPDLOG_ERROR("XTPOrderCancelInfo is nullptr");
323        return;
324    }
325    SPDLOG_ERROR("XTPOrderCancelInfo: {}", to_string(*cancel_info));
326
327    auto &bf_order_cancel_info =
328        get_thread_writer()->open_custom_data<BufferXTPOrderCancelInfo>(kCancelOrderErrorType, now());
329    memcpy(&bf_order_cancel_info.cancel_info, cancel_info, sizeof(XTPOrderCancelInfo));
330    bf_order_cancel_info.session_id = session_id;
331    if (error_info != nullptr) {
332        memcpy(&bf_order_cancel_info.error_info, error_info, sizeof(XTPRI));
333    } else {
334        memset(&bf_order_cancel_info.error_info, 0, sizeof(XTPRI));
335    }
336    SPDLOG_DEBUG("BufferXTPOrderInfo: {}", to_string(bf_order_cancel_info));
337    get_thread_writer()->close_data();
338}
339
340bool TraderXTP::custom_OnCancelOrderError(const event_ptr &event) {
341    const auto &bf_order_cancel_info = event->custom_data<BufferXTPOrderCancelInfo>();
342    return custom_OnCancelOrderError(bf_order_cancel_info.cancel_info, bf_order_cancel_info.error_info,
343                                    bf_order_cancel_info.session_id);
344}
345
346bool TraderXTP::custom_OnCancelOrderError(const XTPOrderCancelInfo &cancel_info, const XTPRI &error_info,
347                                        uint64_t session_id) {
348    SPDLOG_DEBUG("XTPOrderCancelInfo: {}", to_string(cancel_info));
349    SPDLOG_DEBUG("session_id: {}, XTPRI: {}", session_id, to_string(error_info));
350
351    uint64_t action_id = get_action_id(cancel_info.order_xtp_id);
352    if (not has_order_action(action_id)) {
353        SPDLOG_WARN("has not related OrderAction of {}:{}", cancel_info.order_xtp_id, action_id);
354        return false;
355    }
356
357    auto action_state = get_order_action(action_id);
358    auto order_id = action_state.data.order_id;
359    if (not has_order(order_id)) {
360        SPDLOG_WARN("order_id not in orders_ {}", order_id);
361        return false;
362    }
363
364    auto order_state = get_order(order_id);
365    if (has_writer(order_state.dest)) {
366        OrderActionError &error = get_writer(order_state.dest)->open_data<OrderActionError>(now());
367        error.order_id = order_state.data.order_id; // 订单ID
368        std::string str_external_order_id = std::to_string(cancel_info.order_xtp_id);
369        error.external_order_id = str_external_order_id.c_str();
370        error.order_action_id = action_id;       // 订单操作ID,
371        error.error_id = error_info.error_id;    // 错误ID
372        error.error_msg = error_info.error_msg;  // 错误信息
373        error.insert_time = time::now_in_nano(); // 写入时间
374        SPDLOG_DEBUG("OrderActionError: {}", error.to_string());
375        get_writer(order_state.dest)->close_data();
376    } else {
377        OrderActionError error{};
378        error.order_id = order_state.data.order_id; // 订单ID
379        std::string str_external_order_id = std::to_string(cancel_info.order_xtp_id);
380        error.external_order_id = str_external_order_id.c_str();
381        error.order_action_id = action_id;       // 订单操作ID,
382        error.error_id = error_info.error_id;    // 错误ID
383        error.error_msg = error_info.error_msg;  // 错误信息
384        error.insert_time = time::now_in_nano(); // 写入时间
385        SPDLOG_DEBUG("OrderActionError: {}", error.to_string());
386        try_write_to(error, order_state.dest);
387    }
388    return true;
389}
390
391void TraderXTP::OnQueryPosition(XTPQueryStkPositionRsp *position, XTPRI *error_info, int request_id, bool is_last,
392                                uint64_t session_id) {
393    if (nullptr == position) {
394        SPDLOG_ERROR("XTPQueryStkPositionRsp is nullptr");
395        return;
396    }
397    SPDLOG_TRACE("XTPQueryStkPositionRsp: {}", to_string(*position));
398
399    auto &bf_position = get_thread_writer()->open_custom_data<BufferXTPQueryStkPositionRsp>(kQueryPositionType);
400    memcpy(&bf_position.position, position, sizeof(XTPQueryStkPositionRsp));
401    if (error_info != nullptr) {
402        memcpy(&bf_position.error_info, error_info, sizeof(XTPRI));
403    } else {
404        memset(&bf_position.error_info, 0, sizeof(XTPRI));
405    }
406    bf_position.request_id = request_id;
407    bf_position.is_last = is_last;
408    bf_position.session_id = session_id;
409    get_thread_writer()->close_data();
410}
411
412bool TraderXTP::custom_OnQueryPosition(const event_ptr &event) {
413    const auto &bf_position = event->custom_data<BufferXTPQueryStkPositionRsp>();
414    return custom_OnQueryPosition(bf_position.position, bf_position.error_info, bf_position.request_id,
415                                bf_position.is_last, bf_position.session_id);
416}
417
418bool TraderXTP::custom_OnQueryPosition(const XTPQueryStkPositionRsp &position, const XTPRI &error_info, int request_id,
419                                    bool is_last, uint64_t session_id) {
420    if (error_info.error_id != 0) {
421        SPDLOG_ERROR("error_id:{}, error_msg: {}, request_id: {}, last: {}", error_info.error_id, error_info.error_msg,
422                    request_id, is_last);
423        return false;
424    }
425
426    SPDLOG_TRACE("XTPQueryStkPositionRsp: {}", to_string(position));
427    auto writer = get_position_writer();
428    Position &stock_pos = writer->open_data<Position>(0);
429    from_xtp(position, stock_pos);
430    stock_pos.holder_uid = get_home_uid();
431    stock_pos.source_id = get_home_uid();
432    stock_pos.instrument_type = get_instrument_type(stock_pos.exchange_id, stock_pos.instrument_id);
433    stock_pos.direction = Direction::Long;
434    stock_pos.update_time = yijinjing::time::now_in_nano();
435    SPDLOG_TRACE("Position: {}", stock_pos.to_string());
436    writer->close_data();
437    if (is_last) {
438        PositionEnd &end = writer->open_data<PositionEnd>(0);
439        end.holder_uid = get_home_uid();
440        writer->close_data();
441        enable_positions_sync();
442    }
443    return true;
444}
445
446void TraderXTP::OnQueryAsset(XTPQueryAssetRsp *asset, XTPRI *error_info, int request_id, bool is_last,
447                            uint64_t session_id) {
448    if (nullptr == asset) {
449        SPDLOG_ERROR("XTPQueryAssetRsp is nullptr");
450        return;
451    }
452    SPDLOG_TRACE("XTPQueryAssetRsp: {}", to_string(*asset));
453
454    auto &bf_asset = get_thread_writer()->open_custom_data<BufferXTPQueryAssetRsp>(kQueryAssetType);
455    memcpy(&bf_asset.asset, asset, sizeof(XTPQueryAssetRsp));
456    if (error_info != nullptr) {
457        memcpy(&bf_asset.error_info, error_info, sizeof(XTPRI));
458    } else {
459        memset(&bf_asset.error_info, 0, sizeof(XTPRI));
460    }
461    bf_asset.request_id = request_id;
462    bf_asset.is_last = is_last;
463    bf_asset.session_id = session_id;
464    get_thread_writer()->close_data();
465}
466
467bool TraderXTP::custom_OnQueryAsset(const event_ptr &event) {
468    const auto &bf_asset = event->custom_data<BufferXTPQueryAssetRsp>();
469    return custom_OnQueryAsset(bf_asset.asset, bf_asset.error_info, bf_asset.request_id, bf_asset.is_last,
470                            bf_asset.session_id);
471}
472
473bool TraderXTP::custom_OnQueryAsset(const XTPQueryAssetRsp &asset, const XTPRI &error_info, int request_id,
474                                    bool is_last, uint64_t session_id) {
475    if (error_info.error_id != 0) {
476        SPDLOG_ERROR("error_id: {}, error_msg: {}, request_id: {}, last: {}", error_info.error_id, error_info.error_msg,
477                    request_id, is_last);
478    }
479
480    if (error_info.error_id == 0 || error_info.error_id == 11000350) {
481        SPDLOG_TRACE("OnQueryAsset: {}", to_string(asset));
482        auto writer = get_asset_writer();
483        Asset &account = writer->open_data<Asset>(0);
484        if (error_info.error_id == 0) {
485            from_xtp(asset, account);
486        }
487        account.holder_uid = get_live_home_uid();
488        account.update_time = yijinjing::time::now_in_nano();
489        SPDLOG_TRACE("Asset: {}", account.to_string());
490        writer->close_data();
491        enable_asset_sync();
492    }
493    return true;
494}
495
496bool TraderXTP::req_history_order(const event_ptr &event) {
497    XTPQueryOrderReq query_param{};
498    int request_id = get_request_id();
499    int ret = api_->QueryOrders(&query_param, session_id_, request_id);
500    if (0 != ret) {
501        SPDLOG_ERROR("QueryOrders False: {}", ret);
502    }
503    map_request_location_.emplace(request_id, event->source());
504    return 0 == ret;
505}
506
507bool TraderXTP::req_history_trade(const event_ptr &event) {
508    XTPQueryTraderReq query_param{};
509    int request_id = get_request_id();
510    int ret = api_->QueryTrades(&query_param, session_id_, request_id);
511    if (0 != ret) {
512        SPDLOG_ERROR("QueryTrades False : {}", ret);
513    }
514    map_request_location_.emplace(request_id, event->source());
515    return 0 == ret;
516}
517
518void TraderXTP::OnQueryOrder(XTPQueryOrderRsp *order_info, XTPRI *error_info, int request_id, bool is_last,
519                            uint64_t session_id) {
520    SPDLOG_DEBUG("request_id: {}, is_last: {}, session_id: {}", request_id, is_last, session_id);
521    auto &bf_order_info = get_thread_writer()->open_custom_data<BufferXTPOrderInfo>(kQueryXTPOrderInfoType, now());
522    if (order_info != nullptr) {
523        memcpy(&bf_order_info.order_info, order_info, sizeof(XTPOrderInfo));
524    } else {
525        memset(&bf_order_info.order_info, 0, sizeof(XTPOrderInfo));
526    }
527    if (error_info != nullptr) {
528        memcpy(&bf_order_info.error_info, error_info, sizeof(XTPRI));
529    } else {
530        memset(&bf_order_info.error_info, 0, sizeof(XTPRI));
531    }
532    bf_order_info.session_id = session_id;
533    bf_order_info.request_id = request_id;
534    bf_order_info.is_last = is_last;
535    SPDLOG_DEBUG("BufferXTPOrderInfo: {}", to_string(bf_order_info));
536    get_thread_writer()->close_frame(sizeof(BufferXTPOrderInfo));
537}
538
539bool TraderXTP::custom_OnQueryOrder(const event_ptr &event) {
540    const auto *bf_order_info = reinterpret_cast<const BufferXTPOrderInfo *>(event->data_address());
541    return custom_OnQueryOrder(bf_order_info->order_info, bf_order_info->error_info, bf_order_info->request_id,
542                            bf_order_info->is_last, bf_order_info->session_id);
543}
544
545bool TraderXTP::custom_OnQueryOrder(const XTPOrderInfo &order_info, const XTPRI &error_info, int request_id,
546                                    bool is_last, uint64_t session_id) {
547    SPDLOG_DEBUG("XTPOrderInfo: {}", to_string(order_info));
548    SPDLOG_DEBUG("XTPRI: {}", to_string(error_info));
549    SPDLOG_DEBUG("request_id: {}, is_last: {}", request_id, is_last);
550
551    // 查询历史流水收到nullptr, 经过journal走一圈后表现形式为 order_xtp_id == 0
552    if (order_info.order_xtp_id == 0 and is_last and
553        map_request_location_.find(request_id) != map_request_location_.end()) {
554        SPDLOG_WARN("XTPQueryOrderRsp* order_info == nullptr, no data returned!");
555        auto writer = get_history_writer(request_id);
556        HistoryOrder &history_order = writer->open_data<HistoryOrder>();
557        history_order.is_last = true;
558        history_order.data_type = HistoryDataType::TotalEnd;
559        const std::string msg = "No order today";
560        history_order.error_msg = msg.c_str();
561        writer->close_data();
562        SPDLOG_DEBUG("HistoryOrder: {}", history_order.to_string());
563        return false;
564    }
565
566    if (map_request_location_.find(request_id) == map_request_location_.end()) {
567        // TD重连收到推送当做普通下单委托响应处理
568        if (is_last) {
569            req_order_over_ = true;
570            try_ready();
571        }
572        return order_info.order_xtp_id != 0 and custom_OnOrderEvent(order_info, error_info, request_id);
573    }
574
575    auto writer = get_history_writer(request_id);
576    HistoryOrder &history_order = writer->open_data<HistoryOrder>();
577
578    if (error_info.error_id != 0) {
579        SPDLOG_ERROR("OnQueryOrder False , error_code : {}, error_msg : {}", error_info.error_id, error_info.error_msg);
580        history_order.error_id = error_info.error_id;
581        history_order.error_msg = error_info.error_msg;
582    }
583
584    from_xtp(order_info, history_order);
585    history_order.order_id = writer->current_frame_uid();
586    history_order.is_last = is_last;
587    history_order.insert_time = yijinjing::time::now_in_nano();
588    history_order.update_time = history_order.insert_time;
589    SPDLOG_DEBUG("HistoryOrder: {}", history_order.to_string());
590    writer->close_data();
591    return true;
592}
593
594yijinjing::journal::writer_ptr TraderXTP::get_history_writer(uint64_t request_id) {
595    return get_writer(map_request_location_.try_emplace(request_id).first->second);
596}
597
598void TraderXTP::OnQueryTrade(XTPQueryTradeRsp *trade_info, XTPRI *error_info, int request_id, bool is_last,
599                            uint64_t session_id) {
600    SPDLOG_DEBUG("request_id: {}, is_last: {}, session_id: {}", request_id, is_last, session_id);
601    auto &bf_trade_info = get_thread_writer()->open_custom_data<BufferXTPTradeReport>(kQueryXTPTradeReportType, now());
602    if (trade_info != nullptr) {
603        memcpy(&bf_trade_info.trade_info, trade_info, sizeof(XTPOrderInfo));
604    } else {
605        memset(&bf_trade_info.trade_info, 0, sizeof(XTPOrderInfo));
606    }
607    if (error_info != nullptr) {
608        memcpy(&bf_trade_info.error_info, error_info, sizeof(XTPRI));
609    } else {
610        memset(&bf_trade_info.error_info, 0, sizeof(XTPRI));
611    }
612    bf_trade_info.session_id = session_id;
613    bf_trade_info.request_id = request_id;
614    bf_trade_info.is_last = is_last;
615    SPDLOG_DEBUG("BufferXTPTradeReport: {}", to_string(bf_trade_info));
616    get_thread_writer()->close_data();
617}
618
619bool TraderXTP::custom_OnQueryTrade(const event_ptr &event) {
620    const auto *bf_trade_info = reinterpret_cast<const BufferXTPTradeReport *>(event->data_address());
621    return custom_OnQueryTrade(bf_trade_info->trade_info, bf_trade_info->error_info, bf_trade_info->request_id,
622                            bf_trade_info->is_last, bf_trade_info->session_id);
623}
624
625bool TraderXTP::custom_OnQueryTrade(const XTPTradeReport &trade_info, const XTPRI &error_info, int request_id,
626                                    bool is_last, uint64_t session_id) {
627    SPDLOG_DEBUG("XTPTradeReport: {}", to_string(trade_info));
628    SPDLOG_DEBUG("XTPRI: {}", to_string(error_info));
629    SPDLOG_DEBUG("request_id: {}, is_last: {}", request_id, is_last);
630
631    // 查询历史流水收到nullptr, 经过journal走一圈后表现形式为 order_xtp_id == 0
632    if (trade_info.order_xtp_id == 0 and is_last and
633        map_request_location_.find(request_id) != map_request_location_.end()) {
634        SPDLOG_WARN("XTPQueryTradeRsp* trade_info == nullptr, no data returned!");
635        auto writer = get_history_writer(request_id);
636        HistoryTrade &history_trade = writer->open_data<HistoryTrade>(now());
637        history_trade.is_last = true;
638        history_trade.data_type = HistoryDataType::TotalEnd;
639        const std::string msg = "No trade today";
640        history_trade.error_msg = msg.c_str();
641        writer->close_data();
642        return false;
643    }
644
645    if (map_request_location_.find(request_id) == map_request_location_.end()) {
646        // TD重连收到推送当做普通交易成交回报推送处理
647        if (is_last) {
648            req_trade_over_ = true;
649            try_ready();
650        }
651        return trade_info.order_xtp_id != 0 and custom_OnTradeEvent(trade_info, session_id);
652    }
653
654    auto writer = get_history_writer(request_id);
655    HistoryTrade &history_trade = writer->open_data<HistoryTrade>(now());
656
657    if (error_info.error_id != 0) {
658        SPDLOG_ERROR("OnQueryTrade False , error_code : {}, error_msg : {}", error_info.error_id, error_info.error_msg);
659        history_trade.error_id = error_info.error_id;
660        history_trade.error_msg = error_info.error_msg;
661    }
662
663    from_xtp(trade_info, history_trade);
664    history_trade.trade_id = writer->current_frame_uid();
665    history_trade.is_last = is_last;
666    history_trade.trade_time = yijinjing::time::now_in_nano();
667    history_trade.instrument_type = get_instrument_type(history_trade.exchange_id, history_trade.instrument_id);
668    SPDLOG_DEBUG("HistoryTrade: {}", history_trade.to_string());
669    writer->close_data();
670    return false;
671}
672
673void TraderXTP::on_recover() {
674    for (auto &pair : get_orders()) {
675        SPDLOG_DEBUG("Order: {}", pair.second.data.to_string());
676        const std::string str_external_order_id = pair.second.data.external_order_id.to_string();
677        if (not str_external_order_id.empty()) {
678            uint64_t order_id = pair.first;
679            uint64_t order_xtp_id = std::stoull(str_external_order_id);
680            map_xtp_to_kf_order_id_.emplace(order_xtp_id, order_id);
681            map_kf_to_xtp_order_id_.emplace(order_id, order_xtp_id);
682        }
683    }
684    for (auto &pair : get_trades()) {
685        SPDLOG_DEBUG("Trade: {}", pair.second.data.to_string());
686        uint64_t order_xtp_id = std::stoull(pair.second.data.external_order_id);
687        map_xtp_order_id_to_xtp_trader_ids_.try_emplace(order_xtp_id)
688            .first->second.emplace(pair.second.data.external_trade_id.to_string());
689    }
690}
691
692void TraderXTP::req_order_trade() {
693    if (disable_recover_) {
694        return try_ready();
695    }
696
697    XTPQueryOrderReq query_order_param{};
698    int ret = api_->QueryOrders(&query_order_param, session_id_, get_request_id());
699    if (0 != ret) {
700        SPDLOG_ERROR("QueryOrders False: {}", ret);
701    }
702
703    XTPQueryTraderReq query_trade_param{};
704    ret = api_->QueryTrades(&query_trade_param, session_id_, get_request_id());
705    if (0 != ret) {
706        SPDLOG_ERROR("QueryTrades False : {}", ret);
707    }
708}
709
710void TraderXTP::try_ready() {
711    if (BrokerState::Ready == get_state()) {
712        return;
713    }
714
715    SPDLOG_DEBUG("req_order_over_: {}, req_trade_over_: {}", req_order_over_, req_trade_over_);
716    if (disable_recover_ or (req_order_over_ and req_trade_over_)) {
717        update_broker_state(BrokerState::Ready);
718    }
719}
720
721void TraderXTP::try_deal_XTPTradeReport(uint64_t xtp_order_id) {
722    auto &xtp_trades = map_xtp_order_id_to_XTPTradeReports_.try_emplace(xtp_order_id).first->second;
723    for (const auto &xtp_trade : xtp_trades) {
724        custom_OnTradeEvent(xtp_trade, session_id_);
725    }
726    xtp_trades.clear();
727}
728
729void TraderXTP::add_XTPTradeReport(const XTPTradeReport &trade_info) {
730    map_xtp_order_id_to_XTPTradeReports_.try_emplace(trade_info.order_xtp_id).first->second.push_back(trade_info);
731}
732
733bool TraderXTP::has_dealt_trade(uint64_t xtp_order_id, const std::string &exec_id) {
734    auto &exec_ids = map_xtp_order_id_to_xtp_trader_ids_.try_emplace(xtp_order_id).first->second;
735    return exec_ids.find(exec_id) != exec_ids.end();
736}
737
738void TraderXTP::add_dealt_trade(uint64_t xtp_order_id, const std::string &exec_id) {
739    map_xtp_order_id_to_xtp_trader_ids_.try_emplace(xtp_order_id).first->second.emplace(exec_id);
740}
741
742bool TraderXTP::on_custom_event(const event_ptr &event) {
743    SPDLOG_DEBUG("msg_type: {}", event->msg_type());
744    switch (event->msg_type()) {
745    case kXTPOrderInfoType:
746        return custom_OnOrderEvent(event);
747    case kXTPTradeReportType:
748        return custom_OnTradeEvent(event);
749    case kQueryXTPOrderInfoType:
750        return custom_OnQueryOrder(event);
751    case kQueryXTPTradeReportType:
752        return custom_OnQueryTrade(event);
753    case kCancelOrderErrorType:
754        return custom_OnCancelOrderError(event);
755    case kQueryAssetType:
756        return custom_OnQueryAsset(event);
757    case kQueryPositionType:
758        return custom_OnQueryPosition(event);
759    default:
760        SPDLOG_ERROR("unrecognized msg_type: {}", event->msg_type());
761        return false;
762    }
763}
764
765void TraderXTP::add_traded_volume(uint64_t order_xtp_id, int64_t trade_volume) {
766    map_xtp_order_id_to_traded_volume_.try_emplace(order_xtp_id).first->second += trade_volume;
767}
768
769int64_t TraderXTP::get_traded_volume(uint64_t order_xtp_id) {
770    return map_xtp_order_id_to_traded_volume_.try_emplace(order_xtp_id).first->second;
771}
772
773void TraderXTP::add_action_id(uint64_t xtp_order_id, int64_t action_id) {
774    map_xtp_order_id_to_action_ids_.try_emplace(xtp_order_id).first->second.push(action_id);
775}
776
777uint64_t TraderXTP::get_action_id(uint64_t xtp_order_id) {
778    auto &action_ids = map_xtp_order_id_to_action_ids_.try_emplace(xtp_order_id).first->second;
779    if (not action_ids.empty()) {
780        uint64_t action_id = action_ids.front();
781        action_ids.pop();
782        SPDLOG_DEBUG("xtp_order_id:action_id = {}:{}", xtp_order_id, action_id);
783        return action_id;
784    } else {
785        SPDLOG_ERROR("action_ids is empty");
786        return 0;
787    }
788}
789
790} // namespace kungfu::wingchun::xtp

marketdata_xtp.h文件

xtp的行情源头文件定义

 1#ifndef KUNGFU_XTP_EXT_MARKET_DATA_H
 2#define KUNGFU_XTP_EXT_MARKET_DATA_H
 3
 4#include <kungfu/wingchun/broker/marketdata.h>
 5#include <kungfu/yijinjing/common.h>
 6#include <xtp_quote_api.h>
 7
 8namespace kungfu::wingchun::xtp {
 9class MarketDataXTP : public XTP::API::QuoteSpi, public broker::MarketData {
10public:
11    explicit MarketDataXTP(broker::BrokerVendor &vendor);
12
13    ~MarketDataXTP() override;
14
15    bool subscribe(const std::vector<longfist::types::InstrumentKey> &instrument_keys) override;
16
17    bool subscribe_all() override;
18    bool subscribe_custom(const longfist::types::CustomSubscribe &custom_sub) override;
19    bool unsubscribe(const std::vector<longfist::types::InstrumentKey> &instrument_keys) override { return false; };
20
21    /// 当客户端与行情后台通信连接断开时,该方法被调用。
22    ///@param reason 错误原因,请与错误代码表对应
23    ///@remark
24    ///  api不会自动重连,当断线发生时,请用户自行选择后续操作。可以在此函数中调用Login重新登录。注意用户重新登录后,需要重新订阅行情
25    void OnDisconnected(int reason) override;
26
27    /// 订阅行情应答,包括股票、指数和期权
28    ///@param ticker 详细的合约订阅情况
29    ///@param error_info 订阅合约发生错误时的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
30    ///@param is_last 是否此次订阅的最后一个应答,当为最后一个的时候为true,如果为false,表示还有其他后续消息响应
31    ///@remark 每条订阅的合约均对应一条订阅应答,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
32    void OnSubMarketData(XTPST *ticker, XTPRI *error_info, bool is_last) override;
33
34    /// 深度行情通知,包含买一卖一队列
35    ///@param market_data 行情数据
36    ///@param bid1_qty 买一队列数据
37    ///@param bid1_count 买一队列的有效委托笔数
38    ///@param max_bid1_count 买一队列总委托笔数
39    ///@param ask1_qty 卖一队列数据
40    ///@param ask1_count 卖一队列的有效委托笔数
41    ///@param max_ask1_count 卖一队列总委托笔数
42    ///@remark 需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
43    void OnDepthMarketData(XTPMD *market_data, int64_t bid1_qty[], int32_t bid1_count, int32_t max_bid1_count,
44                        int64_t ask1_qty[], int32_t ask1_count, int32_t max_ask1_count) override;
45
46    /// 订阅逐笔行情应答,包括股票、指数和期权
47    ///@param ticker 详细的合约订阅情况
48    ///@param error_info 订阅合约发生错误时的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
49    ///@param is_last 是否此次订阅的最后一个应答,当为最后一个的时候为true,如果为false,表示还有其他后续消息响应
50    ///@remark 每条订阅的合约均对应一条订阅应答,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
51    void OnSubTickByTick(XTPST *ticker, XTPRI *error_info, bool is_last) override;
52
53    /// 逐笔行情通知,包括股票、指数和期权
54    ///@param tbt_data
55    /// 逐笔行情数据,包括逐笔委托和逐笔成交,此为共用结构体,需要根据type来区分是逐笔委托还是逐笔成交,需要快速返回,否则会堵塞后续消息,当堵塞严重时,会触发断线
56    void OnTickByTick(XTPTBT *tbt_data) override;
57
58    /// 订阅全市场的股票逐笔行情应答
59    ///@param exchange_id
60    /// 表示当前全订阅的市场,如果为XTP_EXCHANGE_UNKNOWN,表示沪深全市场,XTP_EXCHANGE_SH表示为上海全市场,XTP_EXCHANGE_SZ表示为深圳全市场
61    ///@param error_info
62    /// 取消订阅合约时发生错误时返回的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
63    ///@remark 需要快速返回
64    void OnSubscribeAllTickByTick(XTP_EXCHANGE_TYPE exchange_id, XTPRI *error_info) override;
65
66    /// 查询可交易合约的应答
67    ///@param ticker_info 可交易合约信息
68    ///@param error_info
69    /// 查询可交易合约时发生错误时返回的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
70    ///@param is_last
71    /// 是否此次查询可交易合约的最后一个应答,当为最后一个的时候为true,如果为false,表示还有其他后续消息响应
72    void OnQueryAllTickers(XTPQSI *ticker_info, XTPRI *error_info, bool is_last) override;
73
74    /// 查询合约完整静态信息的应答
75    ///@param ticker_info 合约完整静态信息
76    ///@param error_info
77    /// 查询合约完整静态信息时发生错误时返回的错误信息,当error_info为空,或者error_info.error_id为0时,表明没有错误
78    ///@param is_last
79    /// 是否此次查询合约完整静态信息的最后一个应答,当为最后一个的时候为true,如果为false,表示还有其他后续消息响应
80    void OnQueryAllTickersFullInfo(XTPQFI *ticker_info, XTPRI *error_info, bool is_last) override;
81
82protected:
83    void on_start() override;
84
85    void pre_start() override;
86
87private:
88    XTP::API::QuoteApi *api_{};
89    uint32_t entrust_band_uid_{};
90    uint32_t transaction_band_uid_{};
91
92    inline static thread_local yijinjing::journal::writer_ptr entrust_band_writer_{};
93    inline static thread_local yijinjing::journal::writer_ptr transaction_band_writer_{};
94
95    bool subscribe(const std::vector<std::string> &instruments, const std::string &exchange_id);
96};
97} // namespace kungfu::wingchun::xtp
98
99#endif // KUNGFU_XTP_EXT_MARKET_DATA_H

marketdata_xtp.cpp文件

xtp的行情源函数实现

  1#include "marketdata_xtp.h"
  2#include "serialize_xtp.h"
  3#include "type_convert.h"
  4
  5using namespace kungfu::yijinjing;
  6using namespace kungfu::yijinjing::data;
  7
  8namespace kungfu::wingchun::xtp {
  9struct MDConfiguration {
 10    int client_id;
 11    std::string account_id;
 12    std::string password;
 13    std::string md_ip;
 14    int md_port;
 15    std::string protocol;
 16    int buffer_size;
 17    bool query_instruments;
 18};
 19
 20void from_json(const nlohmann::json &j, MDConfiguration &c) {
 21    j.at("client_id").get_to(c.client_id);
 22    j.at("account_id").get_to(c.account_id);
 23    j.at("password").get_to(c.password);
 24    j.at("md_ip").get_to(c.md_ip);
 25    j.at("md_port").get_to(c.md_port);
 26    c.protocol = j.value("protocol", "tcp");
 27    if (c.protocol != "udp") {
 28        c.protocol = "tcp";
 29    }
 30    c.buffer_size = j.value("buffer_size", 64);
 31    c.query_instruments = j.value<bool>("query_instruments", false);
 32}
 33
 34MarketDataXTP::MarketDataXTP(broker::BrokerVendor &vendor) : MarketData(vendor), api_(nullptr) {
 35    KUNGFU_SETUP_LOG();
 36    SPDLOG_DEBUG("arguments: {}", get_vendor().get_arguments());
 37}
 38
 39MarketDataXTP::~MarketDataXTP() {
 40    if (api_ != nullptr) {
 41        api_->Release();
 42    }
 43}
 44
 45void MarketDataXTP::pre_start() {
 46    entrust_band_uid_ = request_band("market-data-band-entrust", 256);
 47    transaction_band_uid_ = request_band("market-data-band-transaction", 256);
 48}
 49
 50void MarketDataXTP::on_start() {
 51    MDConfiguration config = nlohmann::json::parse(get_config());
 52    if (config.client_id < 1 or config.client_id > 99) {
 53        SPDLOG_ERROR("client_id must between 1 and 99");
 54    }
 55    auto md_ip = config.md_ip.c_str();
 56    auto account_id = config.account_id.c_str();
 57    auto password = config.password.c_str();
 58    auto protocol_type = get_xtp_protocol_type(config.protocol);
 59    std::string runtime_folder = get_runtime_folder();
 60    SPDLOG_INFO("Connecting XTP MD for {} at {}://{}:{}", account_id, config.protocol, md_ip, config.md_port);
 61    api_ =
 62        XTP::API::QuoteApi::CreateQuoteApi(config.client_id, runtime_folder.c_str(), XTP_LOG_LEVEL::XTP_LOG_LEVEL_INFO);
 63    if (config.protocol == "udp") {
 64        api_->SetUDPBufferSize(config.buffer_size);
 65    }
 66    api_->RegisterSpi(this);
 67    if (api_->Login(md_ip, config.md_port, account_id, password, protocol_type) == 0) {
 68        update_broker_state(BrokerState::LoggedIn);
 69        update_broker_state(BrokerState::Ready);
 70        SPDLOG_INFO("login success! (account_id) {}", config.account_id);
 71        if (config.query_instruments and not check_if_stored_instruments(time::strfnow("%Y%m%d"))) {
 72            api_->QueryAllTickers(XTP_EXCHANGE_SH);
 73            api_->QueryAllTickers(XTP_EXCHANGE_SZ);
 74            api_->QueryAllTickersFullInfo(XTP_EXCHANGE_SH);
 75            api_->QueryAllTickersFullInfo(XTP_EXCHANGE_SZ);
 76        }
 77    } else {
 78        update_broker_state(BrokerState::LoginFailed);
 79        SPDLOG_ERROR("failed to login, [{}] {}", api_->GetApiLastError()->error_id, api_->GetApiLastError()->error_msg);
 80    }
 81}
 82
 83bool MarketDataXTP::subscribe(const std::vector<InstrumentKey> &instrument_keys) {
 84    bool result = true;
 85    std::vector<std::string> sse_tickers;
 86    std::vector<std::string> sze_tickers;
 87    for (const auto &inst : instrument_keys) {
 88        std::string ticker = inst.instrument_id;
 89        if (strcmp(inst.exchange_id, EXCHANGE_SSE) == 0) {
 90            sse_tickers.push_back(ticker);
 91        } else if (strcmp(inst.exchange_id, EXCHANGE_SZE) == 0) {
 92            sze_tickers.push_back(ticker);
 93        }
 94    }
 95    if (!sse_tickers.empty()) {
 96        result &= subscribe(sse_tickers, EXCHANGE_SSE);
 97    }
 98    if (!sze_tickers.empty()) {
 99        result &= subscribe(sze_tickers, EXCHANGE_SZE);
100    }
101    return result;
102}
103
104bool MarketDataXTP::subscribe(const std::vector<std::string> &instruments, const std::string &exchange_id) {
105    int size = instruments.size();
106    std::vector<char *> insts;
107    insts.reserve(size);
108    std::transform(instruments.begin(), instruments.end(), std::back_inserter(insts),
109                [](auto &s) { return const_cast<char *>(s.c_str()); });
110    XTP_EXCHANGE_TYPE exchange;
111    to_xtp_exchange(exchange, exchange_id.c_str());
112    int level1_result = api_->SubscribeMarketData(insts.data(), size, exchange);
113    int level2_result = api_->SubscribeTickByTick(insts.data(), size, exchange);
114    SPDLOG_INFO("subscribe {} from {}, l1 rtn code {}, l2 rtn code {}", size, exchange_id, level1_result,
115                level2_result);
116    return level1_result == 0 and level2_result == 0;
117}
118
119bool MarketDataXTP::subscribe_all() {
120    auto result = api_->SubscribeAllMarketData() && api_->SubscribeAllTickByTick();
121    SPDLOG_INFO("subscribe all, rtn code {}", result);
122    return result;
123}
124
125bool MarketDataXTP::subscribe_custom(const longfist::types::CustomSubscribe &custom_sub) {
126    SPDLOG_INFO("custom_sub, market_type {} instrument_type {} data_type {} update_time {}",
127                int(custom_sub.market_type), long(custom_sub.instrument_type), long(custom_sub.data_type),
128                long(custom_sub.update_time));
129    subscribe_all();
130    return true;
131}
132
133void MarketDataXTP::OnDisconnected(int reason) {
134    SPDLOG_ERROR("disconnected with reason {}", reason);
135    update_broker_state(BrokerState::DisConnected);
136}
137
138void MarketDataXTP::OnSubMarketData(XTPST *ticker, XTPRI *error_info, bool is_last) {
139    if (nullptr != ticker) {
140        SPDLOG_DEBUG("XTPST: {}, is_last: {}", to_string(*ticker), is_last);
141    }
142    if (error_info != nullptr && error_info->error_id != 0) {
143        SPDLOG_ERROR("failed to subscribe level 1, [{}] {}", error_info->error_id, error_info->error_msg);
144    }
145}
146
147void MarketDataXTP::OnSubTickByTick(XTPST *ticker, XTPRI *error_info, bool is_last) {
148    if (nullptr != ticker) {
149        SPDLOG_DEBUG("XTPST: {}, is_last: {}", to_string(*ticker), is_last);
150    }
151    if (error_info != nullptr && error_info->error_id != 0) {
152        SPDLOG_ERROR("failed to subscribe level 2, [{}] {}", error_info->error_id, error_info->error_msg);
153    }
154}
155
156void MarketDataXTP::OnSubscribeAllTickByTick(XTP_EXCHANGE_TYPE exchange_id, XTPRI *error_info) {
157    if (error_info != nullptr && error_info->error_id != 0) {
158        SPDLOG_ERROR("failed to subscribe level 2 all, [{}] {}", error_info->error_id, error_info->error_msg);
159    }
160}
161
162void MarketDataXTP::OnQueryAllTickers(XTPQSI *ticker_info, XTPRI *error_info, bool is_last) {
163    if (nullptr != error_info && error_info->error_id != 0) {
164        SPDLOG_ERROR("error_id : {} , error_msg : {}", error_info->error_id, error_info->error_msg);
165        return;
166    }
167
168    if (nullptr == ticker_info) {
169        SPDLOG_ERROR("ticker_info is nullptr");
170        return;
171    }
172
173    Instrument &instrument = get_public_writer()->open_data<Instrument>(0);
174    from_xtp(ticker_info, instrument);
175    get_public_writer()->close_data();
176}
177
178void MarketDataXTP::OnDepthMarketData(XTPMD *market_data, int64_t *bid1_qty, int32_t bid1_count, int32_t max_bid1_count,
179                                    int64_t *ask1_qty, int32_t ask1_count, int32_t max_ask1_count) {
180    if (nullptr == market_data) {
181        SPDLOG_ERROR("XTPMD is nullptr");
182    }
183
184    Quote &quote = get_public_writer()->open_data<Quote>(0);
185    from_xtp(*market_data, quote);
186    get_public_writer()->close_data();
187}
188
189void MarketDataXTP::OnTickByTick(XTPTBT *tbt_data) {
190    if (tbt_data->type == XTP_TBT_ENTRUST) {
191        if (tbt_data->entrust.ord_type == 'D') {
192            if (not transaction_band_writer_) {
193                while (not has_band_writer(transaction_band_uid_)) {
194                    std::this_thread::sleep_for(std::chrono::milliseconds(1));
195                }
196                transaction_band_writer_ = get_band_writer(transaction_band_uid_);
197            }
198            Transaction &transaction = transaction_band_writer_->open_data<Transaction>(0);
199            from_xtp(*tbt_data, transaction);
200            transaction_band_writer_->close_data();
201        } else {
202            if (not entrust_band_writer_) {
203                while (not has_band_writer(entrust_band_uid_)) {
204                    std::this_thread::sleep_for(std::chrono::milliseconds(1));
205                }
206                entrust_band_writer_ = get_band_writer(entrust_band_uid_);
207            }
208            Entrust &entrust = entrust_band_writer_->open_data<Entrust>(0);
209            from_xtp(*tbt_data, entrust);
210            entrust_band_writer_->close_data();
211        }
212    } else if (tbt_data->type == XTP_TBT_TRADE) {
213        if (not transaction_band_writer_) {
214            while (not has_band_writer(transaction_band_uid_)) {
215                std::this_thread::sleep_for(std::chrono::milliseconds(1));
216            }
217            transaction_band_writer_ = get_band_writer(transaction_band_uid_);
218        }
219        Transaction &transaction = transaction_band_writer_->open_data<Transaction>(0);
220        from_xtp(*tbt_data, transaction);
221        transaction_band_writer_->close_data();
222    }
223}
224
225void MarketDataXTP::OnQueryAllTickersFullInfo(XTPQFI *ticker_info, XTPRI *error_info, bool is_last) {
226    if (nullptr != error_info && error_info->error_id != 0) {
227        SPDLOG_INFO("error_id : {} , error_msg : {}", error_info->error_id, error_info->error_msg);
228        return;
229    }
230
231    if (nullptr == ticker_info) {
232        SPDLOG_ERROR("ticker_info is nullptr");
233        return;
234    }
235
236    Instrument &instrument = get_public_writer()->open_data<Instrument>(0);
237    instrument.instrument_id = ticker_info->ticker;
238    if (ticker_info->exchange_id == 1) {
239        instrument.exchange_id = EXCHANGE_SSE;
240    } else if (ticker_info->exchange_id == 2) {
241        instrument.exchange_id = EXCHANGE_SZE;
242    }
243
244    memcpy(instrument.product_id, ticker_info->ticker_name, strlen(ticker_info->ticker_name));
245    instrument.instrument_type = get_instrument_type(instrument.exchange_id, instrument.instrument_id);
246    SPDLOG_TRACE("instrument {}", instrument.to_string());
247    get_public_writer()->close_data();
248
249    if (is_last) {
250        record_stored_instruments_trading_day(time::strfnow("%Y%m%d"));
251    }
252}
253} // namespace kungfu::wingchun::xtp

exports.cpp文件

xtp柜台和行情源绑定到python模块

1#include "marketdata_xtp.h"
2#include "trader_xtp.h"
3
4#include <kungfu/wingchun/extension.h>
5
6KUNGFU_EXTENSION() {
7    KUNGFU_DEFINE_MD(kungfu::wingchun::xtp::MarketDataXTP);
8    KUNGFU_DEFINE_TD(kungfu::wingchun::xtp::TraderXTP);
9}

编译指令

1# 在packge.json所在的目录执行
2yarn build

编译后目录结构:

xtp/                                        # xtp柜台名称
├── src/
│   └── cpp
│       ├── buffer_data.h
│       ├── exports.cpp
│       ├── marketdata_xtp.cpp
│       ├── marketdata_xtp.h
│       ├── serialize_xtp.h
│       ├── trader_xtp.cpp
│       ├── trader_xtp.h
│       └── type_convert.h
└── package.json                            # 编译配置信息
--------以下为编译后自动生成内容------------------------------------
├── __kungfulibs__                          # 根据package.json的kungfuDependencies下载
│   └── xtp                                 # 柜台名
│       └── v2.2.37.4                       # 柜台API版本
│           ├── doc                         # 柜台文档
│           ├── include                     # 柜台头文件
│           └── lib                         # 柜台库文件
├── CMakeLists.txt                          # 根据package.json自动生成
├── build                                   # 编译中间目录
└── dist                                    # 编译结果
    └── xtp

备注

如果是已经添加到Kungfu柜台仓库的柜台API版本, 在执行yarn build之后就可以自动从仓库上下载;

如果柜台API版本不在Kungfu柜台仓库列表中, 需要手动创建以下目录

手动添加自定义的库目录:

├── __kungfulibs__                          # kugnfu寻找C++库的目录,
│   └── {对接的柜台名字}                     # 柜台名, 需要和package.json中的kungfuDependencies配置的key相同
│       └── {柜台的版本号}                   # 柜台API版本, 需要和package.json中的kungfuDependencies配置的value相同
│           ├── doc                         # 柜台文档
│           ├── include                     # 柜台头文件
│           └── lib                         # 柜台库文件

启动插件

添加交易柜台

将dist目录下xtp整个目录拷贝到 {kungfu安装目录}/resources/app/kungfu-extensions 目录下, 启动Kungfu图形化界面,

选择主面板中的”交易账户”界面点击添加, 选择xtp模块.

_images/xtp%E8%B4%A6%E6%88%B7-25.png

添加行情源

选择主面板中的”行情源”界面点击添加, 选择xtp模块.

_images/xtp%E8%A1%8C%E6%83%85-2-5.png

策略范例

Python策略范例

源码目录结构:

strategy-python-101/                        # 名字随意
├── src/
│   └── python
|       └── KungfuStrategy101Python         # 必须与package.json中kungfuConfig的key相同
|           └── __init__.py                 # python策略代码,文件名不可更改
└── package.json                            # 编译配置信息

编译后文件目录结构:

strategy-python-101/
├── src/
│   └── python
|       └── KungfuStrategy101Python
|           └── __init__.py
├── package.json
├── __pypackages__/                                         # Python模块库, 自动生成
├── dist/                                                   # 编译打包出来的二进制文件
|   └── KungfuStrategy101Python
|       └── KungfuStrategy101Python.cp39-win_amd64.pyd      # 编译之后的二进制文件
├── pdm.lock                                                # build后下载依赖库自动生成的文件
└── pyproject.toml                                          # build后下载依赖库自动生成的文件

package.json:

{
    "name": "@kungfu-trader/examples-strategy-python",
    "author": {
        "name": "Kungfu Trader",
        "email": "info@kungfu.link"
    },
    "description": "KungFu Strategy 101 - Python Demo",
    "license": "Apache-2.0",
    "kungfuBuild": {
        "python": {
            "dependencies": {}
        }
    },
    "kungfuConfig": {
        "key": "KungfuStrategy101Python"               # key对应的是 策略文件中策略文件(.py文件)所在文件夹的名字 , 这个名字不能有 _ , - . 比如命名不可以是 : kungfu-demo , kungfu_demo
    }
}

__init__.py 策略范例 :

 1# __init__.py
 2
 3import random
 4from kungfu.wingchun.constants import *
 5import kungfu
 6
 7lf = kungfu.__binding__.longfist
 8wc = kungfu.__binding__.wingchun
 9yjj = kungfu.__binding__.yijinjing
10
11source = "sim"  # 目标交易账户的柜台名称
12account = "fill"  # 目标交易账户的账户号, 需添加 sim 柜台的账户号为 fill 的账户
13md_source = "sim"  # 目标行情源的柜台名称, 需添加 sim 行情源
14
15
16def pre_start(context):
17    context.log.info("pre start")
18    context.add_account(source, account)  # 添加交易账户
19    context.subscribe(md_source, ["600000", "600004", "600009"], Exchange.SSE)  # 订阅行情
20    context.subscribe(md_source, ["300033", "300059"], Exchange.SZE)  # 订阅行情
21    context.subscribe(md_source, ["rb2401"], Exchange.SHFE)  # 订阅行情
22    context.subscribe(md_source, ["sc2401"], Exchange.INE)  # 订阅行情
23    # context.subscribe_operator("bar", "123") # 需从算子入口添加bar插件, 并定义bar的id为123
24    context.throttle_insert_order = {}
25
26
27def post_start(context):
28    account_uid = context.get_account_uid(source, account)
29    context.log.info(f"account {source} {account}, account_uid: {account_uid}")
30
31
32def on_quote(context, quote, location, dest):
33    # insert order interval 10s
34    if context.now() - context.throttle_insert_order.get(quote.instrument_id, 0) < 10000000000:
35        return
36    context.throttle_insert_order[quote.instrument_id] = context.now()
37
38    side = random.choice([Side.Buy, Side.Sell])
39    offset = random.choice([Offset.Open, Offset.Close])
40    side = random.choice([Side.Buy, Side.Sell])
41    price = quote.ask_price[0] if side == Side.Buy else quote.bid_price[0]
42    price_type = random.choice([PriceType.Any, PriceType.Limit])
43    volume = 3 if quote.instrument_type == InstrumentType.Future else 300
44    order_id = context.insert_order(
45        quote.instrument_id,
46        quote.exchange_id,
47        source,
48        account,
49        price,
50        volume,
51        price_type,
52        side,
53        offset,
54    )
55    context.log.info(f"insert order: {order_id}")
56
57
58# 监听算子广播信息
59def on_synthetic_data(context, synthetic_dataa, location, dest):
60    context.log.info("on_synthetic_data: {}".format(synthetic_dataa))
61
62
63def on_order(context, order, location, dest):
64    context.log.info(f"on_order: {order}, from {location} to {dest}")
65
66    if not wc.utils.is_final_status(order.status):
67        context.cancel_order(order.order_id)
68
69
70def on_trade(context, trade, location, dest):
71    context.log.info(f"on_trade: {trade}, from {location} to {dest}")

通过主面板的 策略进程->添加->策略路径 选择 KungfuStrategy101Python.cp39-win_amd64.pyd, 点击启动就可以运行Python编译后的策略代码


CPP策略范例

源码目录结构:

strategy-cpp-101/
├── src/
│   └── cpp
|       └── strategy.cpp                    # cpp策略代码
└── package.json                            # 编译配置信息

编译后文件目录结构:

strategy--101/
├── src/
│   └── cpp
|       └── strategy.cpp                                # cpp策略代码
├── package.json
├── dist/                                               # 编译打包出来的二进制文件
|   └── KungfuStrategy101Cpp
|       └── KungfuStrategy101Cpp.cp39-win_amd64.pyd     # 编译之后的二进制文件
└── build                                               # build 编译生成中间文件

package.json:

{
    "name": "@kungfu-trader/examples-strategy-cpp",
    "author": "kungfu-trader",
    "description": "KungFu Strategy 101 - C++ Demo",
    "license": "Apache-2.0",
    "kungfuConfig": {
        "key": "KungfuStrategy101Cpp"                   # 编译之后的二进制文件所在文件夹名
    },
    "kungfuBuild": {                                    # 打包模块相关
        "cpp": {
        "target": "bind/python"
        }
    }
}
 1// strategy.cpp
 2#include <kungfu/wingchun/extension.h>
 3#include <kungfu/wingchun/strategy/context.h>
 4#include <kungfu/wingchun/strategy/strategy.h>
 5
 6using namespace kungfu::longfist::enums;
 7using namespace kungfu::longfist::types;
 8using namespace kungfu::wingchun::strategy;
 9using namespace kungfu::yijinjing::data;
10
11KUNGFU_MAIN_STRATEGY(KungfuStrategy101) {
12public:
13KungfuStrategy101() = default;
14~KungfuStrategy101() = default;
15
16void pre_start(Context_ptr & context) override {
17    SPDLOG_INFO("preparing strategy");
18    SPDLOG_INFO("arguments: {}", context->get_arguments());
19
20    context->add_account("sim", "fill");
21    context->subscribe("sim", {"600000"}, {"SSE"});
22}
23
24void post_start(Context_ptr & context) override { SPDLOG_INFO("strategy started"); }
25
26void on_quote(Context_ptr & context, const Quote &quote, const location_ptr &location, uint32_t dest) override {
27    SPDLOG_INFO("Quote: {}  location: {}", quote.to_string(), location->to_string());
28    context->insert_order(quote.instrument_id, quote.exchange_id, "sim", "fill", quote.last_price, 200,
29                        PriceType::Limit, Side::Buy, Offset::Open);
30}
31
32void on_order(Context_ptr & context, const Order &order, const location_ptr &location, uint32_t dest) override {
33    SPDLOG_INFO("Order: {}", order.to_string());
34}
35
36void on_trade(Context_ptr & context, const Trade &trade, const location_ptr &location, uint32_t dest) override {
37    SPDLOG_INFO("Trade: {}", trade.to_string());
38}
39
40void on_tree(Context_ptr & context, const Tree &tree, const location_ptr &location, uint32_t dest) override {
41    SPDLOG_INFO("on tree: {}", tree.to_string());
42}
43
44void on_synthetic_data(Context_ptr & context, const SyntheticData &synthetic_data, const location_ptr &location,
45                        uint32_t dest) override {
46    SPDLOG_INFO("on_synthetic_data: {} ", synthetic_data.to_string());
47}
48
49void on_broker_state_change(Context_ptr & context, const BrokerStateUpdate &broker_state_update,
50                            const location_ptr &location) override {
51    SPDLOG_INFO("on broker state changed: {}", broker_state_update.to_string());
52}
53
54void on_operator_state_change(Context_ptr & context, const OperatorStateUpdate &operator_state_update,
55                                const location_ptr &location) override {
56    SPDLOG_INFO("on operator state changed: {}", operator_state_update.to_string());
57}
58};

通过主面板的 策略进程->添加->策略路径 选择 KungfuStrategy101Cpp.cp39-win_amd64.pyd, 点击启动就可以运行Python编译后的策略代码


CPP策略可执行程序范例

源码目录结构:

strategy-cpp-101-exe/
├── src/
│   └── cpp
|       └── strategy.cpp                    # cpp策略代码
└── package.json                            # 编译配置信息

编译后文件目录结构:

strategy-cpp-101-exe/
├── src/
│   └── cpp
|       └── strategy.cpp                    # cpp策略代码
├── package.json
├── dist/                                   # 编译打包出来的二进制文件
|   └── KungfuStrategy101CppExe
|       └── KungfuStrategy101CppExe.exe     # 可执行文件
└── build                                   # build 编译生成中间文件

package.json:

{
    "name": "@kungfu-trader/examples-strategy-cpp",
    "author": "kungfu-trader",
    "description": "KungFu Strategy 101 - C++ Demo",
    "license": "Apache-2.0",
    "kungfuConfig": {
        "key": "KungfuStrategy101CppExe"
    },
    "kungfuBuild": {
        "cpp": {
            "target": "exe"
        }
    }
}
 1// strategy.cpp
 2#include <kungfu/wingchun/strategy/context.h>
 3#include <kungfu/wingchun/strategy/runner.h>
 4#include <kungfu/wingchun/strategy/strategy.h>
 5
 6using namespace kungfu::longfist::enums;
 7using namespace kungfu::longfist::types;
 8using namespace kungfu::wingchun::strategy;
 9using namespace kungfu::yijinjing::data;
10
11class KungfuStrategy101 : public Strategy {
12public:
13KungfuStrategy101() = default;
14~KungfuStrategy101() = default;
15
16void pre_start(Context_ptr &context) override {
17    SPDLOG_INFO("preparing strategy");
18    SPDLOG_INFO("arguments: {}", context->get_arguments());
19
20    context->add_account("sim", "fill");
21    context->subscribe("sim", {"600000"}, {"SSE"});
22}
23
24void post_start(Context_ptr &context) override { SPDLOG_INFO("strategy started"); }
25
26void on_quote(Context_ptr &context, const Quote &quote, const location_ptr &location, uint32_t dest) override {
27    SPDLOG_INFO("Quote: {}  location: {}", quote.to_string(), location->to_string());
28    context->insert_order(quote.instrument_id, quote.exchange_id, "sim", "fill", quote.last_price, 200,
29                        PriceType::Limit, Side::Buy, Offset::Open);
30}
31
32void on_order(Context_ptr &context, const Order &order, const location_ptr &location, uint32_t dest) override {
33    SPDLOG_INFO("Order: {}", order.to_string());
34}
35
36void on_trade(Context_ptr &context, const Trade &trade, const location_ptr &location, uint32_t dest) override {
37    SPDLOG_INFO("Trade: {}", trade.to_string());
38}
39
40void on_tree(Context_ptr &context, const Tree &tree, const location_ptr &location, uint32_t dest) override {
41    SPDLOG_INFO("on tree: {}", tree.to_string());
42}
43
44void on_synthetic_data(Context_ptr &context, const SyntheticData &synthetic_data, const location_ptr &location,
45                        uint32_t dest) override {
46    SPDLOG_INFO("on_synthetic_data: {} ", synthetic_data.to_string());
47}
48
49void on_broker_state_change(Context_ptr &context, const BrokerStateUpdate &broker_state_update,
50                            const location_ptr &location) override {
51    SPDLOG_INFO("on broker state changed: {}", broker_state_update.to_string());
52}
53
54void on_operator_state_change(Context_ptr &context, const OperatorStateUpdate &operator_state_update,
55                                const location_ptr &location) override {
56    SPDLOG_INFO("on operator state changed: {}", operator_state_update.to_string());
57}
58};
59
60int main(int argc, char **argv) {
61    SPDLOG_INFO("runner1 add strategy1");
62    Runner runner(std::make_shared<locator>(), "CppStrategy", "demo01exe", mode::LIVE, false);
63    SPDLOG_INFO("runner");
64    runner.add_strategy(std::make_shared<KungfuStrategy101>());
65    runner.run();
66    SPDLOG_INFO("Over");
67    return 0;
68}

直接运行 KungfuStrategy101CppExe.exe 程序就可以运行以上策略

小技巧

  1. 需要将 {Kungfu安装目录}/resource/kfc/ 目录下的Kungfu.dll放在KungfuStrategy101CppExe.exe同一个目录下, 或者配置系统变量使得程序运行时可以找到动态库Kungfu.dll

  2. 尽量使用命令行运行, 鼠标双击运行后在退出时会自动关闭命令行界面, 必须要找到Kungfu运行home目录下找到对应log文件才能查看运行日志