Skip to main content

SDKs and Tools

Introduction

Webull provides SDKs in both Python and Java for professional clients, and also offers a web-based institutional Portal for clients to log in, allowing them to view account funds, positions, orders, and more. The Portal also supports subscriptions to advanced paid market data.

SDK Installation

Install via pip(Python version 3.8 through 3.11 is required)

pip3 install --upgrade webull-openapi-python-sdk

API Host

Note

The HTTP API address is used for standard HTTP requests.

The trading message push address is used for real-time push notifications such as order status updates.

The market data message push address is used for real-time market data updates.

Test Environment

HTTP API: api.sandbox.webull.hk
Trading message push: events-api.sandbox.webull.hk
Market data message push: data-api.sandbox.webull.hk

Production Environment

HTTP API: api.webull.hk
Trading message push: events-api.webull.hk
Market data message push: data-api.webull.hk

Calling the Test API

How to obtain Test Environment App Key and App Secret:

You may use the test accounts provided in the Test Accounts table for testing.

Trade Request Example

import json
import unittest
import uuid
from time import sleep

from webull.core.client import ApiClient
from webull.data.common.category import Category
from webull.trade.trade_client import TradeClient

optional_api_endpoint = "<api_endpoint>"
your_app_key = "<your_app_key>"
your_app_secret = "<your_app_secret>"
region_id = "<region_id>"
account_id = "<your_account_id>"
api_client = ApiClient(your_app_key, your_app_secret, region_id)
api_client.add_endpoint(region_id, optional_api_endpoint)


if __name__ == '__main__':
trade_client = TradeClient(api_client)

res = trade_client.account_v2.get_account_list()
if res.status_code == 200:
print("account_list=" + json.dumps(res.json(), indent=4))

res = trade_client.account_v2.get_account_balance(account_id)
if res.status_code == 200:
print("account_balance=" + json.dumps(res.json(), indent=4))

res = trade_client.account_v2.get_account_position(account_id)
if res.status_code == 200:
print("account_position=" + json.dumps(res.json(), indent=4))

preview_orders = {
"symbol": "AAPL",
"instrument_type": "EQUITY",
"market": "US",
"order_type": "MARKET",
"quantity": "1",
"support_trading_session": "N",
"side": "BUY",
"time_in_force": "DAY",
"entrust_type": "QTY"
}
res = trade_client.order_v2.preview_order(account_id=account_id, preview_orders=preview_orders)
if res.status_code == 200:
print("preview_res=" + json.dumps(res.json(), indent=4))

client_order_id = uuid.uuid4().hex
new_orders = {
"client_order_id": client_order_id,
"symbol": "AAPL",
"instrument_type": "EQUITY",
"market": "US",
"order_type": "LIMIT",
"limit_price": "188",
"quantity": "1",
"support_trading_session": "N",
"side": "BUY",
"time_in_force": "DAY",
"entrust_type": "QTY",
}

# This is an optional feature; you can still make a request without setting it.
custom_headers_map = {"category": Category.US_STOCK.name}
trade_client.order_v2.add_custom_headers(custom_headers_map)
res = trade_client.order_v2.place_order(account_id=account_id, new_orders=new_orders)
trade_client.order_v2.remove_custom_headers()
if res.status_code == 200:
print("place_order_res=" + json.dumps(res.json(), indent=4))
sleep(5)

modify_orders = {
"client_order_id": client_order_id,
"quantity": "100",
"limit_price": "200"
}
res = trade_client.order_v2.replace_order(account_id=account_id, modify_orders=modify_orders)
if res.status_code == 200:
print("replace_order_res=" + json.dumps(res.json(), indent=4))
sleep(5)

res = trade_client.order_v2.cancel_order_v2(account_id=account_id, client_order_id=client_order_id)
if res.status_code == 200:
print("cancel_order_res=" + json.dumps(res.json(), indent=4))

res = trade_client.order_v2.get_order_history_request(account_id=account_id)
if res.status_code == 200:
print("order_history_res=" + json.dumps(res.json(), indent=4))

# order detail
res = trade_client.order_v2.get_order_detail(account_id=account_id, client_order_id=client_order_id)
if res.status_code == 200:
print("order detail=" + json.dumps(res.json(), indent=4))

# Options
# For option order inquiries, please use the V2 query interface: api.order_v2.get_order_detail(account_id, client_order_id).
client_order_id = uuid.uuid4().hex
option_new_orders = [
{
"client_order_id": client_order_id,
"combo_type": "NORMAL",
"order_type": "LIMIT",
"quantity": "1",
"limit_price": "11.25",
"option_strategy": "SINGLE",
"side": "BUY",
"time_in_force": "GTC",
"entrust_type": "QTY",
"orders": [
{
"side": "BUY",
"quantity": "1",
"symbol": "AAPL",
"strike_price": "249.0",
"init_exp_date": "2025-08-15",
"instrument_type": "OPTION",
"option_type": "CALL",
"market": "US"
}
]
}
]
# preview
res = trade_client.order_v2.preview_option(account_id, option_new_orders)
if res.status_code == 200:
print("preview option=" + json.dumps(res.json(), indent=4))
sleep(5)
# place

# This is an optional feature; you can still make a request without setting it.
custom_headers_map = {"category": Category.US_OPTION.name}
trade_client.order_v2.add_custom_headers(custom_headers_map)
res = trade_client.order_v2.place_option(account_id, option_new_orders)
trade_client.order_v2.remove_custom_headers()
if res.status_code == 200:
print("place option=" + json.dumps(res.json(), indent=4))
sleep(5)

# replace
option_modify_orders = [
{
"client_order_id": client_order_id,
"quantity": "2",
"limit_price": "11.3",
"orders": [
{
"client_order_id": client_order_id,
"quantity": "2"
}
]
}
]
res = trade_client.order_v2.replace_option(account_id, option_modify_orders)
if res.status_code == 200:
print("replace option=" + json.dumps(res.json(), indent=4))
sleep(5)

# cancel
res = trade_client.order_v2.cancel_option(account_id, client_order_id)
if res.status_code == 200:
print("cancel option=" + json.dumps(res.json(), indent=4))

Market Data Example(Http)


from webull.data.common.category import Category
from webull.data.common.timespan import Timespan
from webull.core.client import ApiClient
from webull.data.data_client import DataClient

optional_api_endpoint = "<api_endpoint>"
your_app_key = "<your_app_key>"
your_app_secret = "<your_app_secret>"
region_id = "<region_id>"
api_client = ApiClient(your_app_key, your_app_secret, region_id)
api_client.add_endpoint(region_id, optional_api_endpoint)

if __name__ == '__main__':
data_client = DataClient(api_client)

trading_sessions = ["PRE", "RTH", "ATH", "OVN"]
res = data_client.instrument.get_instrument("AAPL", Category.US_STOCK.name)
if res.status_code == 200:
print('get_instrument:', res.json())

res = data_client.market_data.get_snapshot('AAPL', Category.US_STOCK.name, extend_hour_required=True, overnight_required=True)
if res.status_code == 200:
print('get_snapshot:', res.json())

res = data_client.market_data.get_history_bar('AAPL', Category.US_STOCK.name, Timespan.M1.name)
if res.status_code == 200:
print('get_history_bar:', res.json())

res = data_client.market_data.get_batch_history_bar(['AAPL', 'TSLA'], Category.US_STOCK.name, Timespan.M1.name, 1)
if res.status_code == 200:
print('get_batch_history_bar:', res.json())

res = data_client.market_data.get_tick("AAPL", Category.US_STOCK.name, trading_sessions=trading_sessions)
if res.status_code == 200:
print('get_tick:', res.json())

res = data_client.market_data.get_quotes("AAPL", Category.US_STOCK.name, depth=1, overnight_required=True)
if res.status_code == 200:
print('get_quotes:', res.json())

Market Data Example(mqtt sync)


import logging
import uuid
from logging.handlers import TimedRotatingFileHandler

from webull.data.common.category import Category
from webull.data.common.subscribe_type import SubscribeType
from webull.data.data_streaming_client import DataStreamingClient

your_app_key = "</your_app_key>"
your_app_secret = "</your_app_secret>"
optional_api_endpoint = "</optional_quotes_endpoint>"
optional_quotes_endpoint = "</optional_quotes_endpoint>"
region_id = '<region_id>'

session_id = uuid.uuid4().hex
data_streaming_client = DataStreamingClient(your_app_key, your_app_secret, region_id, session_id,
http_host=optional_api_endpoint,
mqtt_host=optional_quotes_endpoint)

if __name__ == '__main__':
def my_connect_success_func(client, api_client, quotes_session_id):
print("connect success with session_id:%s" % quotes_session_id)
# subscribe
symbols = ['00700']
sub_types = [SubscribeType.QUOTE.name, SubscribeType.SNAPSHOT.name, SubscribeType.TICK.name]
client.subscribe( symbols, Category.HK_STOCK.name, sub_types)

def my_quotes_message_func(client, topic, quotes):
print("receive message: topic:%s, quotes:%s" % (topic, quotes))

def my_subscribe_success_func(client, api_client, quotes_session_id):
print("subscribe success with session_id:%s" % quotes_session_id)

# set connect success callback func
data_streaming_client.on_connect_success = my_connect_success_func
# set quotes receiving callback func
data_streaming_client.on_quotes_message = my_quotes_message_func
# set subscribe success callback func
data_streaming_client.on_subscribe_success = my_subscribe_success_func
# the sync mode, blocking in current thread
data_streaming_client.connect_and_loop_forever()

Market Data Example(mqtt async)



import time
import uuid

from webull.data.common.category import Category
from webull.data.common.subscribe_type import SubscribeType
from webull.data.data_streaming_client import DataStreamingClient


your_app_key = "</your_app_key>"
your_app_secret = "</your_app_secret>"
optional_api_endpoint = "</optional_quotes_endpoint>"
optional_quotes_endpoint = "</optional_quotes_endpoint>"
region_id = '<region_id>'

session_id = uuid.uuid4().hex
data_streaming_client = DataStreamingClient(your_app_key, your_app_secret, region_id, session_id,
http_host=optional_api_endpoint,
mqtt_host=optional_quotes_endpoint)

if __name__ == '__main__':
def my_connect_success_func(client, api_client, quotes_session_id):
print("connect success with session_id:%s" % quotes_session_id)
# subscribe
symbols = ['00700']
sub_types = [SubscribeType.QUOTE.name, SubscribeType.SNAPSHOT.name, SubscribeType.TICK.name]
client.subscribe(symbols, Category.HK_STOCK.name, sub_types)


def my_quotes_message_func(client, topic, quotes):
print("receive message: topic:%s, quotes:%s" % (topic, quotes))


def my_subscribe_success_func(client, api_client, quotes_session_id):
print("subscribe success with session_id:%s" % quotes_session_id)


# set connect success callback func
data_streaming_client.on_connect_success = my_connect_success_func
# set quotes receiving callback func
data_streaming_client.on_quotes_message = my_quotes_message_func
# set subscribe success callback func
data_streaming_client.on_subscribe_success = my_subscribe_success_func

# the async mode, processing in another thread
data_streaming_client.connect_and_loop_start()

ticker = 60
print("will remove subscription after %s seconds..." % ticker)
time.sleep(ticker)

subscribe_success = data_streaming_client.get_subscribe_success()
quotes_session_id = data_streaming_client.get_session_id()
if subscribe_success:
print("start remove subscription...")
data_streaming_client.unsubscribe(unsubscribe_all=True)
print("remove subscription finish")
else:
print("Do not remove subscription, subscribe_success:%s", subscribe_success)

start_time = time.time()
wait_time = 1
while True:
elapsed = int(time.time() - start_time)
if elapsed >= ticker:
print("Wait completed, start subscribing...")
break
print("Waiting {} seconds before subscription... (elapsed {}s / {}s)".format(wait_time, elapsed, ticker))
time.sleep(wait_time)

# subscribe
connect_success = data_streaming_client.get_connect_success()
if connect_success:
symbols = ['00700']
sub_types = [SubscribeType.QUOTE.name, SubscribeType.SNAPSHOT.name, SubscribeType.TICK.name]
data_streaming_client.subscribe(symbols, Category.HK_STOCK.name, sub_types)
print("add subscription...")
else:
print("Do not add subscription, connect_success:%s", connect_success)

print("will stop processing after %s seconds" % ticker)
time.sleep(ticker)
data_streaming_client.loop_stop()
print("processing done")

Sandbox Test Accounts

The test account information for individual API integration is as follows:

No.Test Account IDTest App KeyTest Secret Key
1V4H6R3L4VRI33UQ4TGR2NM1VI94b2b7acd2bf0d30d8aea173fceefa238840b4353a6a31ce3ab91e2f99a510272
2OGG4RRLC6EDE98HI920KRBVSKB42bd186fb65ea76de309d69cf12f024e29feb64b59d6b1b6b2d2aa8cea8a1b8d
32DHSQ9B1DMPBFPMPFU2R5SDPB864fc722617af8b5ebb746f50a910e91fa268416fc681d438533f9e9316bab576

Feedback and Communication

  1. You can contact our staff via the Webull API service email address: webull-api-support@webull.com