跳至主要内容

SDKs and Tools

簡介

Webull為專業客戶提供Python和Java的軟體開發套件(SDK),同時也提供一個基於網頁的機構投資人入口,方便客戶登入後查閱帳戶資金、持倉、訂單等資訊。該入口亦支援訂閱高級付費市場數據

SDK安裝

透過pip安裝(需 Python 3.8至3.11版本)

pip3 install --upgrade webull-openapi-python-sdk

API位址說明

注意

HTTP API 位址用於標準的 HTTP 請求

交易消息推送位址用於即時推送通知,例如訂單狀態更新

行情數據消息推送位址用於即時行情數據更新

測試環境

HTTP API: api.sandbox.webull.hk
Trading message push: events-api.sandbox.webull.hk
Market data message push: data-api.sandbox.webull.hk

生產環境

HTTP API: api.webull.hk
Trading message push: events-api.webull.hk
Market data message push: data-api.webull.hk

呼叫測試 API

如何獲取測試環境的 App Key 和 App Secret:

您可以使用測試帳戶表中提供的測試帳戶進行測試。

交易請求範例

import json
import unittest
import uuid
from time import sleep

from webull.core.client import ApiClient
from webull.data.common.category import Category
from webull.trade.trade_client import TradeClient

optional_api_endpoint = "<api_endpoint>"
your_app_key = "<your_app_key>"
your_app_secret = "<your_app_secret>"
region_id = "<region_id>"
account_id = "<your_account_id>"
api_client = ApiClient(your_app_key, your_app_secret, region_id)
api_client.add_endpoint(region_id, optional_api_endpoint)


if __name__ == '__main__':
trade_client = TradeClient(api_client)

res = trade_client.account_v2.get_account_list()
if res.status_code == 200:
print("account_list=" + json.dumps(res.json(), indent=4))

res = trade_client.account_v2.get_account_balance(account_id)
if res.status_code == 200:
print("account_balance=" + json.dumps(res.json(), indent=4))

res = trade_client.account_v2.get_account_position(account_id)
if res.status_code == 200:
print("account_position=" + json.dumps(res.json(), indent=4))

preview_orders = {
"symbol": "AAPL",
"instrument_type": "EQUITY",
"market": "US",
"order_type": "MARKET",
"quantity": "1",
"support_trading_session": "N",
"side": "BUY",
"time_in_force": "DAY",
"entrust_type": "QTY"
}
res = trade_client.order_v2.preview_order(account_id=account_id, preview_orders=preview_orders)
if res.status_code == 200:
print("preview_res=" + json.dumps(res.json(), indent=4))

client_order_id = uuid.uuid4().hex
new_orders = {
"client_order_id": client_order_id,
"symbol": "AAPL",
"instrument_type": "EQUITY",
"market": "US",
"order_type": "LIMIT",
"limit_price": "188",
"quantity": "1",
"support_trading_session": "N",
"side": "BUY",
"time_in_force": "DAY",
"entrust_type": "QTY",
}

# This is an optional feature; you can still make a request without setting it.
custom_headers_map = {"category": Category.US_STOCK.name}
trade_client.order_v2.add_custom_headers(custom_headers_map)
res = trade_client.order_v2.place_order(account_id=account_id, new_orders=new_orders)
trade_client.order_v2.remove_custom_headers()
if res.status_code == 200:
print("place_order_res=" + json.dumps(res.json(), indent=4))
sleep(5)

modify_orders = {
"client_order_id": client_order_id,
"quantity": "100",
"limit_price": "200"
}
res = trade_client.order_v2.replace_order(account_id=account_id, modify_orders=modify_orders)
if res.status_code == 200:
print("replace_order_res=" + json.dumps(res.json(), indent=4))
sleep(5)

res = trade_client.order_v2.cancel_order_v2(account_id=account_id, client_order_id=client_order_id)
if res.status_code == 200:
print("cancel_order_res=" + json.dumps(res.json(), indent=4))

res = trade_client.order_v2.get_order_history_request(account_id=account_id)
if res.status_code == 200:
print("order_history_res=" + json.dumps(res.json(), indent=4))

# order detail
res = trade_client.order_v2.get_order_detail(account_id=account_id, client_order_id=client_order_id)
if res.status_code == 200:
print("order detail=" + json.dumps(res.json(), indent=4))

# Options
# For option order inquiries, please use the V2 query interface: api.order_v2.get_order_detail(account_id, client_order_id).
client_order_id = uuid.uuid4().hex
option_new_orders = [
{
"client_order_id": client_order_id,
"combo_type": "NORMAL",
"order_type": "LIMIT",
"quantity": "1",
"limit_price": "11.25",
"option_strategy": "SINGLE",
"side": "BUY",
"time_in_force": "GTC",
"entrust_type": "QTY",
"orders": [
{
"side": "BUY",
"quantity": "1",
"symbol": "AAPL",
"strike_price": "249.0",
"init_exp_date": "2025-08-15",
"instrument_type": "OPTION",
"option_type": "CALL",
"market": "US"
}
]
}
]
# preview
res = trade_client.order_v2.preview_option(account_id, option_new_orders)
if res.status_code == 200:
print("preview option=" + json.dumps(res.json(), indent=4))
sleep(5)
# place

# This is an optional feature; you can still make a request without setting it.
custom_headers_map = {"category": Category.US_OPTION.name}
trade_client.order_v2.add_custom_headers(custom_headers_map)
res = trade_client.order_v2.place_option(account_id, option_new_orders)
trade_client.order_v2.remove_custom_headers()
if res.status_code == 200:
print("place option=" + json.dumps(res.json(), indent=4))
sleep(5)

# replace
option_modify_orders = [
{
"client_order_id": client_order_id,
"quantity": "2",
"limit_price": "11.3",
"orders": [
{
"client_order_id": client_order_id,
"quantity": "2"
}
]
}
]
res = trade_client.order_v2.replace_option(account_id, option_modify_orders)
if res.status_code == 200:
print("replace option=" + json.dumps(res.json(), indent=4))
sleep(5)

# cancel
res = trade_client.order_v2.cancel_option(account_id, client_order_id)
if res.status_code == 200:
print("cancel option=" + json.dumps(res.json(), indent=4))

行情範例(Http)


from webull.data.common.category import Category
from webull.data.common.timespan import Timespan
from webull.core.client import ApiClient
from webull.data.data_client import DataClient

optional_api_endpoint = "<api_endpoint>"
your_app_key = "<your_app_key>"
your_app_secret = "<your_app_secret>"
region_id = "<region_id>"
api_client = ApiClient(your_app_key, your_app_secret, region_id)
api_client.add_endpoint(region_id, optional_api_endpoint)

if __name__ == '__main__':
data_client = DataClient(api_client)

trading_sessions = ["PRE", "RTH", "ATH", "OVN"]
res = data_client.instrument.get_instrument("AAPL", Category.US_STOCK.name)
if res.status_code == 200:
print('get_instrument:', res.json())

res = data_client.market_data.get_snapshot('AAPL', Category.US_STOCK.name, extend_hour_required=True, overnight_required=True)
if res.status_code == 200:
print('get_snapshot:', res.json())

res = data_client.market_data.get_history_bar('AAPL', Category.US_STOCK.name, Timespan.M1.name)
if res.status_code == 200:
print('get_history_bar:', res.json())

res = data_client.market_data.get_batch_history_bar(['AAPL', 'TSLA'], Category.US_STOCK.name, Timespan.M1.name, 1)
if res.status_code == 200:
print('get_batch_history_bar:', res.json())

res = data_client.market_data.get_tick("AAPL", Category.US_STOCK.name, trading_sessions=trading_sessions)
if res.status_code == 200:
print('get_tick:', res.json())

res = data_client.market_data.get_quotes("AAPL", Category.US_STOCK.name, depth=1, overnight_required=True)
if res.status_code == 200:
print('get_quotes:', res.json())

行情範例(mqtt sync)


import logging
import uuid
from logging.handlers import TimedRotatingFileHandler

from webull.data.common.category import Category
from webull.data.common.subscribe_type import SubscribeType
from webull.data.data_streaming_client import DataStreamingClient

your_app_key = "</your_app_key>"
your_app_secret = "</your_app_secret>"
optional_api_endpoint = "</optional_quotes_endpoint>"
optional_quotes_endpoint = "</optional_quotes_endpoint>"
region_id = '<region_id>'

session_id = uuid.uuid4().hex
data_streaming_client = DataStreamingClient(your_app_key, your_app_secret, region_id, session_id,
http_host=optional_api_endpoint,
mqtt_host=optional_quotes_endpoint)

if __name__ == '__main__':
def my_connect_success_func(client, api_client, quotes_session_id):
print("connect success with session_id:%s" % quotes_session_id)
# subscribe
symbols = ['00700']
sub_types = [SubscribeType.QUOTE.name, SubscribeType.SNAPSHOT.name, SubscribeType.TICK.name]
client.subscribe( symbols, Category.HK_STOCK.name, sub_types)

def my_quotes_message_func(client, topic, quotes):
print("receive message: topic:%s, quotes:%s" % (topic, quotes))

def my_subscribe_success_func(client, api_client, quotes_session_id):
print("subscribe success with session_id:%s" % quotes_session_id)

# set connect success callback func
data_streaming_client.on_connect_success = my_connect_success_func
# set quotes receiving callback func
data_streaming_client.on_quotes_message = my_quotes_message_func
# set subscribe success callback func
data_streaming_client.on_subscribe_success = my_subscribe_success_func
# the sync mode, blocking in current thread
data_streaming_client.connect_and_loop_forever()

行情範例(mqtt async)



import time
import uuid

from webull.data.common.category import Category
from webull.data.common.subscribe_type import SubscribeType
from webull.data.data_streaming_client import DataStreamingClient


your_app_key = "</your_app_key>"
your_app_secret = "</your_app_secret>"
optional_api_endpoint = "</optional_quotes_endpoint>"
optional_quotes_endpoint = "</optional_quotes_endpoint>"
region_id = '<region_id>'

session_id = uuid.uuid4().hex
data_streaming_client = DataStreamingClient(your_app_key, your_app_secret, region_id, session_id,
http_host=optional_api_endpoint,
mqtt_host=optional_quotes_endpoint)

if __name__ == '__main__':
def my_connect_success_func(client, api_client, quotes_session_id):
print("connect success with session_id:%s" % quotes_session_id)
# subscribe
symbols = ['00700']
sub_types = [SubscribeType.QUOTE.name, SubscribeType.SNAPSHOT.name, SubscribeType.TICK.name]
client.subscribe(symbols, Category.HK_STOCK.name, sub_types)


def my_quotes_message_func(client, topic, quotes):
print("receive message: topic:%s, quotes:%s" % (topic, quotes))


def my_subscribe_success_func(client, api_client, quotes_session_id):
print("subscribe success with session_id:%s" % quotes_session_id)


# set connect success callback func
data_streaming_client.on_connect_success = my_connect_success_func
# set quotes receiving callback func
data_streaming_client.on_quotes_message = my_quotes_message_func
# set subscribe success callback func
data_streaming_client.on_subscribe_success = my_subscribe_success_func

# the async mode, processing in another thread
data_streaming_client.connect_and_loop_start()

ticker = 60
print("will remove subscription after %s seconds..." % ticker)
time.sleep(ticker)

subscribe_success = data_streaming_client.get_subscribe_success()
quotes_session_id = data_streaming_client.get_session_id()
if subscribe_success:
print("start remove subscription...")
data_streaming_client.unsubscribe(unsubscribe_all=True)
print("remove subscription finish")
else:
print("Do not remove subscription, subscribe_success:%s", subscribe_success)

start_time = time.time()
wait_time = 1
while True:
elapsed = int(time.time() - start_time)
if elapsed >= ticker:
print("Wait completed, start subscribing...")
break
print("Waiting {} seconds before subscription... (elapsed {}s / {}s)".format(wait_time, elapsed, ticker))
time.sleep(wait_time)

# subscribe
connect_success = data_streaming_client.get_connect_success()
if connect_success:
symbols = ['00700']
sub_types = [SubscribeType.QUOTE.name, SubscribeType.SNAPSHOT.name, SubscribeType.TICK.name]
data_streaming_client.subscribe(symbols, Category.HK_STOCK.name, sub_types)
print("add subscription...")
else:
print("Do not add subscription, connect_success:%s", connect_success)

print("will stop processing after %s seconds" % ticker)
time.sleep(ticker)
data_streaming_client.loop_stop()
print("processing done")

Sandbox環境測試帳號

個人对接API測試帳號信息如下:

No.Test Account IDTest App KeyTest Secret Key
1V4H6R3L4VRI33UQ4TGR2NM1VI94b2b7acd2bf0d30d8aea173fceefa238840b4353a6a31ce3ab91e2f99a510272
2OGG4RRLC6EDE98HI920KRBVSKB42bd186fb65ea76de309d69cf12f024e29feb64b59d6b1b6b2d2aa8cea8a1b8d
32DHSQ9B1DMPBFPMPFU2R5SDPB864fc722617af8b5ebb746f50a910e91fa268416fc681d438533f9e9316bab576

意見反饋與交流

您可以通过Webull API服務郵箱地址聯繫我們的工作人員:webull-api-support@webull.com