Python BITFINEX WebSocket取引① 詳細な解説の続編
import websocket
import requests
import hashlib
import hmac
import json
import time
import os
from threading import Thread
# GLOBAL VARIABLES
channels = {0: 'Bitfinex'}
symbols = []
tickers = {} # [market][bid/ask]
candles = {} # [market][candle1,candle2...]
def update_tickers(data):
global tickers
sym = channels[data[0]][1]
ticker_raw = data[1]
ticker_parsed = {
'bid': ticker_raw[0],
'ask': ticker_raw[2],
'last_price': ticker_raw[6],
'volume': ticker_raw[7],
}
tickers[sym] = ticker_parsed
def update_candles(data):
global candles
def truncate_market(str_data):
# Get market symbol from channel key
col1 = str_data.find(':t')
res = str_data[col1+2:]
return res
def parse_candle(lst_data):
# Get candle dictionary from list
return {
'mts': lst_data[0],
'open': lst_data[1],
'close': lst_data[2],
'high': lst_data[3],
'low': lst_data[4],
'vol': lst_data[5]
}
market = truncate_market(channels[data[0]][1])
# Identify snapshot (list=snapshot, int=update)
if type(data[1][0]) is list:
lst_candles = []
for raw_candle in data[1]:
candle = parse_candle(raw_candle)
lst_candles.append(candle)
candles[market] = lst_candles
elif type(data[1][0]) is int:
raw_candle = data[1]
lst_candles = candles[market]
candle = parse_candle(raw_candle)
if candle['mts'] == candles[market][0]['mts']:
# Update latest candle
lst_candles[0] = candle
candles[market] = lst_candles
elif candle['mts'] > candles[market][0]['mts']:
# Insert new (latest) candle
lst_candles.insert(0, candle)
candles[market] = lst_candles
def print_details():
# interactive function to view tickers and candles
while len(tickers) == 0 or len(candles) == 0:
# wait for tickers to populate
time.sleep(1)
print('Tickers and candles loaded. You may query a symbol now.')
while True:
symbol = input()
symbol = symbol.upper()
if symbol not in symbols:
print('%s not in list of symbols.' %(symbol))
continue
details = tickers[symbol]
print('%s: Bid: %s, Ask: %s, Last Price: %s, Volume: %s'\
%(symbol, details['bid'], details['ask'],\
details['last_price'], details['volume']))
print('%s: currently has (%s) candles, latest candle: %s'\
%(symbol, len(candles[symbol]), str(candles[symbol][0])))
def on_message(ws, message):
global channels, balances, tickers
data = json.loads(message)
# Handle events
if 'event' in data:
if data['event'] == 'info':
pass # ignore info messages
elif data['event'] == 'auth':
if data['status'] == 'OK':
print('API authentication successful')
else:
print(data['status'])
# Capture all subscribed channels
elif data['event'] == 'subscribed':
if data['channel'] == 'ticker':
channels[data['chanId']] = [data['channel'], data['pair']]
elif data['channel'] == 'candles':
channels[data['chanId']] = [data['channel'], data['key']]
# Handle channel data
else:
chan_id = data[0]
if chan_id in channels:
if 'ticker' in channels[chan_id]:
# if channel is for ticker
if data[1] == 'hb':
# Ignore heartbeat messages
pass
else:
# parse ticker and save to memory
Thread(target=update_tickers, args=(data,)).start()
elif 'candles' in channels[chan_id]:
# if channel is for candles
if data[1] == 'hb':
# Ignore heartbeat messages
pass
else:
# parse candle update and save to memory
Thread(target=update_candles, args=(data,)).start()
def on_error(ws, error):
print(error)
def on_close(ws):
print('### API connection closed ###')
os._exit(0)
def on_open(ws):
print('API connected')
for sym in symbols:
sub_tickers = {
'event': 'subscribe',
'channel': 'ticker',
'symbol': sym
}
ws.send(json.dumps(sub_tickers))
sub_candles = {
'event': 'subscribe',
'channel': 'candles',
'key': 'trade:15m:t' + sym
}
ws.send(json.dumps(sub_candles))
# start printing the books
Thread(target=print_details).start()
def connect_api():
global ws
websocket.enableTrace(False)
ws = websocket.WebSocketApp('wss://api.bitfinex.com/ws/2',
on_message = on_message,
on_error = on_error,
on_close = on_close,
on_open = on_open)
ws.run_forever()
# load USD tickers
res = requests.get("https://api.bitfinex.com/v1/symbols")
all_sym = json.loads(res.content)
for x in all_sym:
if "usd" in x:
symbols.append(x.upper())
print('Found (%s) USD symbols' %(len(symbols)))
# initialize api connection
connect_api()
私は何を学びますか
- 複数のティッカーおよびキャンドルチャネルにサブスクライブする
- メモリ内のティッカーとキャンドルの状態の維持
- マルチスレッドを使用してWebSocketの待ち時間を短縮する
要件
- Python 3.6+
- 依存関係:
websocket-client
- アクティブなインターネット接続
困難
中級
チュートリアルコンテンツ
このチュートリアルに従うには、BitfinexAPIへの基本的なWebSocket接続をセットアップする方法をすでに知っている必要があります。このシリーズの前のチュートリアルに従うことで、これを行う方法を学ぶことができます。
このチュートリアルでは、複数のティッカーチャネルとキャンドルチャネルをサブスクライブし、それらの状態を低レイテンシでメモリに維持する方法を学習します。Pythonのマルチスレッド機能は、操作を同期的に実行できるようにすることで、レイテンシーを削減するのに役立ちます。
on_message()関数によって作成されたレイテンシのボトルネック
websocket-client
Pythonモジュールを使用してビットフィネックスのAPIへの高速データストリームの接続を作成すると、レイテンシが発生する可能性のある潜在的なボトルネックが発生します。数百のオーダーブックやキャンドルなど、多数のデータストリームをサブスクライブする戦略またはスクリプトの場合、更新の受信と処理にかかる時間をできるだけ短くする必要があります。
すべての更新はon_message
関数で受信され、非同期ではないため、その関数で遅延が発生すると、他のメッセージの処理が遅延します。これにより、古い状態の市場データがメモリに保持される可能性があります。
このチュートリアルでは、より大規模な取引システムの一部として、レイテンシーを削減し、この機能のパフォーマンスを向上させるために使用できるさまざまな最適化について説明します。
新しいスレッドを使用してメッセージを処理する
2つの関数を使用して、ろうそくとティッカーの更新の解析を処理し、呼び出されるたびに新しいスレッドでそれらを開始します。そのようです:
つまり、メッセージ関数は新しいスレッドを開始し、それ自体を解放してさらに多くのメッセージを表示するだけで済みますが、解析プロセスやその他の関連プロセスは、すべて同じ変数(candles
およびtickers
)を更新する個々のスレッドで管理できます。
以下は、このチュートリアルのスクリプトで使用される2つの関数のコードです。
update_tickers()
<div class="hcb_wrap"><pre class="prism undefined-numbers lang-python" data-lang="Python"><code>
def update_tickers(data):
global tickers
sym = channels[data[0]][1]
ticker_raw = data[1]
ticker_parsed = {
'bid': ticker_raw[0],
'ask': ticker_raw[2],
'last_price': ticker_raw[6],
'volume': ticker_raw[7],
}
tickers[sym] = ticker_parsed
この関数は、入力を受け取り、そこからティッカー情報を解析してからtickers
、シンボルをキーとして変数に保存します。
update_candles()
def update_candles(data):
global candles
def truncate_market(str_data):
# Get market symbol from channel key
col1 = str_data.find(':t')
res = str_data[col1+2:]
return res
def parse_candle(lst_data):
# Get candle dictionary from list
return {
'mts': lst_data[0],
'open': lst_data[1],
'close': lst_data[2],
'high': lst_data[3],
'low': lst_data[4],
'vol': lst_data[5]
}
market = truncate_market(channels[data[0]][1])
# Identify snapshot (list=snapshot, int=update)
if type(data[1][0]) is list:
lst_candles = []
for raw_candle in data[1]:
candle = parse_candle(raw_candle)
lst_candles.append(candle)
candles[market] = lst_candles
elif type(data[1][0]) is int:
raw_candle = data[1]
lst_candles = candles[market]
candle = parse_candle(raw_candle)
if candle['mts'] == candles[market][0]['mts']:
# Update latest candle
lst_candles[0] = candle
candles[market] = lst_candles
elif candle['mts'] > candles[market][0]['mts']:
# Insert new (latest) candle
lst_candles.insert(0, candle)
candles[market] = lst_candles
この関数は、candlesチャネルからメッセージを受け取り、それを解析して、次のいずれかを実行します。
- 初期スナップショット:キャンドルのリスト全体を保存します
- 最新のキャンドルに更新:新しい値をメモリに保存します
- 新しいキャンドルの追加:新しいキャンドルエントリを作成し、メモリに挿入します
2つの入れ子関数でtruncate_market
あり、parse_candle
マーケットシンボルを取得し、個々のキャンドルを操作しやすい形式に解析するのに役立ちます。
少ないコードが望ましい
on_message()
チュートリアルのスクリプトで関数全体がどのように表示されるかを次に示します。
def on_message(ws, message):
global channels, balances, tickers
data = json.loads(message)
# Handle events
if 'event' in data:
if data['event'] == 'info':
pass # ignore info messages
elif data['event'] == 'auth':
if data['status'] == 'OK':
print('API authentication successful')
else:
print(data['status'])
# Capture all subscribed channels
elif data['event'] == 'subscribed':
if data['channel'] == 'ticker':
channels[data['chanId']] = [data['channel'], data['pair']]
elif data['channel'] == 'candles':
channels[data['chanId']] = [data['channel'], data['key']]
# Handle channel data
else:
chan_id = data[0]
if chan_id in channels:
if 'ticker' in channels[chan_id]:
# if channel is for ticker
if data[1] == 'hb':
# Ignore heartbeat messages
pass
else:
# parse ticker and save to memory
Thread(target=update_tickers, args=(data,)).start()
elif 'candles' in channels[chan_id]:
# if channel is for candles
if data[1] == 'hb':
# Ignore heartbeat messages
pass
else:
# parse candle update and save to memory
Thread(target=update_candles, args=(data,)).start()
ろうそくやティッカーの解析などのプロセスをこの関数から取り除くことで、スクリプトはメッセージをすばやく処理できます。ここでの一般的なガイドラインは、コードを可能な限り軽量に保つことです。このような技術的分析や戦略の実装など、他の計算は、すべての参照主な変数(例えば、別のスレッドにする必要がありますtickers
し、candles
多くの異なるスレッドによって頻繁かつリアルタイムに更新されているこのチュートリアルでは)、のWebSocketメッセージをオフ生み出しました。
このチュートリアルのコードをさらに最適化するには、on_message()
関数で見つかったコードのほとんどを、独自の新しいスレッドで開いた別の関数に委任します。メッセージの内容の種類の確認、チャネルIDの抽出などは、別の機能で処理できます。簡単にするために、この例では実装していませんが、可能です。
複数のティッカーチャネルを購読する
このチュートリアルのスクリプトを構成する残りのコードには、次のものが含まれます。
def on_open(ws):
print('API connected')
for sym in symbols:
sub_tickers = {
'event': 'subscribe',
'channel': 'ticker',
'symbol': sym
}
ws.send(json.dumps(sub_tickers))
sub_candles = {
'event': 'subscribe',
'channel': 'candles',
'key': 'trade:15m:t' + sym
}
ws.send(json.dumps(sub_candles))
# start printing the books
Thread(target=print_details).start()
これは、選択されたすべてのシンボルのティッカーとシンボルをサブスクライブします。例として、このチュートリアルに添付されているスクリプトは、ビットフィネックス取引所で利用可能なすべてのシンボルのリストを取得し、結果をフィルタリングして米ドルのペアのみを取得します。このためのコードは以下のとおりです。
# load USD tickers
res = requests.get("https://api.bitfinex.com/v1/symbols")
all_sym = json.loads(res.content)
for x in all_sym:
if "usd" in x:
symbols.append(x.upper())
print('Found (%s) USD symbols' %(len(symbols)))
インタラクティブツール
バックグラウンドで実行されているインタラクティブ機能があり、シンボルの最新のティッカー情報とキャンドルデータを照会するために使用できます。コードは次のとおりです。
def print_details():
# interactive function to view tickers and candles
while len(tickers) == 0 or len(candles) == 0:
# wait for tickers to populate
time.sleep(1)
print('Tickers and candles loaded. You may query a symbol now.')
while True:
symbol = input()
symbol = symbol.upper()
if symbol not in symbols:
print('%s not in list of symbols.' %(symbol))
continue
details = tickers[symbol]
print('%s: Bid: %s, Ask: %s, Last Price: %s, Volume: %s'\
%(symbol, details['bid'], details['ask'],\
details['last_price'], details['volume']))
print('%s: currently has (%s) candles, latest candle: %s'\
%(symbol, len(candles[symbol]), str(candles[symbol][0])))
スクリプトを初期化するために、これが最後に追加されます。
# initialize api connection
connect_api()
スクリプトの実行
print_details()
関数があるため、スクリプトはインタラクティブです。すべてのデータがロードされたら、シンボル名(BTCUSDなど)を入力すると、以下に示すように、現在のティッカーと最新のキャンドルデータが出力されます。
インタラクティブツールの使用
- スクリプトを実行した後、スクリプトが次のようなステートメントを出力するのを待ちます
Tickers and candles loaded. You may query a symbol now.
- 記号名を入力してEnterキーを押します。例:btcusd(大文字と小文字は区別されません)またはltcusd
- 現在のティッカーの詳細と最新のキャンドルが印刷されます
コメント