Python BITFINEX WebSocket取引② マルチモード(threading)の解説

Python BITFINEX WebSocket取引① 詳細な解説の続編

import websocket
import requests
import hashlib
import hmac
import json
import time
import os
from threading import Thread

# GLOBAL VARIABLES
channels = {0: 'Bitfinex'}
symbols = []
tickers = {}    # [market][bid/ask]
candles = {}    # [market][candle1,candle2...]

def update_tickers(data):
    global tickers
    sym = channels[data[0]][1]
    ticker_raw = data[1]
    ticker_parsed = {
        'bid': ticker_raw[0],
        'ask': ticker_raw[2],
        'last_price': ticker_raw[6],
        'volume': ticker_raw[7],
    }
    tickers[sym] = ticker_parsed

def update_candles(data):
    global candles
    def truncate_market(str_data):
        # Get market symbol from channel key
        col1 = str_data.find(':t')
        res = str_data[col1+2:]
        return res
    def parse_candle(lst_data):
        # Get candle dictionary from list
        return {
            'mts': lst_data[0],
            'open': lst_data[1],
            'close': lst_data[2],
            'high': lst_data[3],
            'low': lst_data[4],
            'vol': lst_data[5]
        }

    market = truncate_market(channels[data[0]][1])
    # Identify snapshot (list=snapshot, int=update)
    if type(data[1][0]) is list: 
        lst_candles = []
        for raw_candle in data[1]:
            candle = parse_candle(raw_candle)
            lst_candles.append(candle)
        candles[market] = lst_candles
    elif type(data[1][0]) is int:
        raw_candle = data[1]
        lst_candles = candles[market]
        candle = parse_candle(raw_candle)
        if candle['mts'] == candles[market][0]['mts']:
            # Update latest candle
            lst_candles[0] = candle
            candles[market] = lst_candles
        elif candle['mts'] > candles[market][0]['mts']:
            # Insert new (latest) candle
            lst_candles.insert(0, candle)
            candles[market] = lst_candles

def print_details():
    # interactive function to view tickers and candles
    while len(tickers) == 0 or len(candles) == 0:
        # wait for tickers to populate
        time.sleep(1)
    print('Tickers and candles loaded. You may query a symbol now.')
    while True:
        symbol = input()
        symbol = symbol.upper()
        if symbol not in symbols:
            print('%s not in list of symbols.' %(symbol))
            continue
        details = tickers[symbol]
        print('%s:  Bid: %s, Ask: %s, Last Price: %s, Volume: %s'\
            %(symbol, details['bid'], details['ask'],\
            details['last_price'], details['volume']))
        print('%s: currently has (%s) candles, latest candle: %s'\
            %(symbol, len(candles[symbol]), str(candles[symbol][0])))

def on_message(ws, message):
    global channels, balances, tickers
    data = json.loads(message)
    # Handle events
    if 'event' in data:
        if data['event'] == 'info':
            pass # ignore info messages
        elif data['event'] == 'auth':
            if data['status'] == 'OK':
                print('API authentication successful')
            else:
                print(data['status'])
        # Capture all subscribed channels
        elif data['event'] == 'subscribed':
            if data['channel'] == 'ticker':
                channels[data['chanId']] = [data['channel'], data['pair']]
            elif data['channel'] == 'candles':
                channels[data['chanId']] = [data['channel'], data['key']]
    # Handle channel data
    else:
        chan_id = data[0]
        if chan_id in channels:
            if 'ticker' in channels[chan_id]:
                # if channel is for ticker
                if data[1] == 'hb':
                    # Ignore heartbeat messages
                    pass
                else:
                    # parse ticker and save to memory
                    Thread(target=update_tickers, args=(data,)).start()
            elif 'candles' in channels[chan_id]:
                # if channel is for candles
                if data[1] == 'hb':
                    # Ignore heartbeat messages
                    pass
                else:
                    # parse candle update and save to memory
                    Thread(target=update_candles, args=(data,)).start()

def on_error(ws, error):
    print(error)

def on_close(ws):
    print('### API connection closed ###')
    os._exit(0)

def on_open(ws):
    print('API connected')
    for sym in symbols:
        sub_tickers = {
            'event': 'subscribe',
            'channel': 'ticker',
            'symbol': sym
        }
        ws.send(json.dumps(sub_tickers))
        sub_candles = {
            'event': 'subscribe',
            'channel': 'candles',
            'key': 'trade:15m:t' + sym
        }
        ws.send(json.dumps(sub_candles))
    # start printing the books
    Thread(target=print_details).start()

def connect_api():
    global ws
    websocket.enableTrace(False)
    ws = websocket.WebSocketApp('wss://api.bitfinex.com/ws/2',
                            on_message = on_message,
                            on_error = on_error,
                            on_close = on_close,
                            on_open = on_open)
    ws.run_forever()

# load USD tickers
res = requests.get("https://api.bitfinex.com/v1/symbols")
all_sym = json.loads(res.content)
for x in all_sym:
    if "usd" in x:
        symbols.append(x.upper())
print('Found (%s) USD symbols' %(len(symbols)))

# initialize api connection
connect_api()

私は何を学びますか

  • 複数のティッカーおよびキャンドルチャネルにサブスクライブする
  • メモリ内のティッカーとキャンドルの状態の維持
  • マルチスレッドを使用してWebSocketの待ち時間を短縮する

要件

  • Python 3.6+
  • 依存関係:
    • websocket-client
  • アクティブなインターネット接続

困難

中級

チュートリアルコンテンツ

このチュートリアルに従うには、BitfinexAPIへの基本的なWebSocket接続をセットアップする方法をすでに知っている必要があります。このシリーズの前のチュートリアルに従うことで、これを行う方法を学ぶことができます。

このチュートリアルでは、複数のティッカーチャネルとキャンドルチャネルをサブスクライブし、それらの状態を低レイテンシでメモリに維持する方法を学習します。Pythonのマルチスレッド機能は、操作を同期的に実行できるようにすることで、レイテンシーを削減するのに役立ちます。

on_message()関数によって作成されたレイテンシのボトルネック

websocket-clientPythonモジュールを使用してビットフィネックスのAPIへの高速データストリームの接続を作成すると、レイテンシが発生する可能性のある潜在的なボトルネックが発生します。数百のオーダーブックやキャンドルなど、多数のデータストリームをサブスクライブする戦略またはスクリプトの場合、更新の受信と処理にかかる時間をできるだけ短くする必要があります。

すべての更新はon_message関数で受信され、非同期ではないため、その関数で遅延が発生すると、他のメッセージの処理が遅延します。これにより、古い状態の市場データがメモリに保持される可能性があります。

このチュートリアルでは、より大規模な取引システムの一部として、レイテンシーを削減し、この機能のパフォーマンスを向上させるために使用できるさまざまな最適化について説明します。

新しいスレッドを使用してメッセージを処理する

2つの関数を使用して、ろうそくとティッカーの更新の解析を処理し、呼び出されるたびに新しいスレッドでそれらを開始します。そのようです:

つまり、メッセージ関数は新しいスレッドを開始し、それ自体を解放してさらに多くのメッセージを表示するだけで済みますが、解析プロセスやその他の関連プロセスは、すべて同じ変数(candlesおよびtickers)を更新する個々のスレッドで管理できます。

以下は、このチュートリアルのスクリプトで使用される2つの関数のコードです。

update_tickers()

<div class="hcb_wrap"><pre class="prism undefined-numbers lang-python" data-lang="Python"><code>
def update_tickers(data):
    global tickers
    sym = channels[data[0]][1]
    ticker_raw = data[1]
    ticker_parsed = {
        'bid': ticker_raw[0],
        'ask': ticker_raw[2],
        'last_price': ticker_raw[6],
        'volume': ticker_raw[7],
    }
    tickers[sym] = ticker_parsed

この関数は、入力を受け取り、そこからティッカー情報を解析してからtickers、シンボルをキーとして変数に保存します。

update_candles()

def update_candles(data):
    global candles
    def truncate_market(str_data):
        # Get market symbol from channel key
        col1 = str_data.find(':t')
        res = str_data[col1+2:]
        return res
    def parse_candle(lst_data):
        # Get candle dictionary from list
        return {
            'mts': lst_data[0],
            'open': lst_data[1],
            'close': lst_data[2],
            'high': lst_data[3],
            'low': lst_data[4],
            'vol': lst_data[5]
        }

    market = truncate_market(channels[data[0]][1])
    # Identify snapshot (list=snapshot, int=update)
    if type(data[1][0]) is list: 
        lst_candles = []
        for raw_candle in data[1]:
            candle = parse_candle(raw_candle)
            lst_candles.append(candle)
        candles[market] = lst_candles
    elif type(data[1][0]) is int:
        raw_candle = data[1]
        lst_candles = candles[market]
        candle = parse_candle(raw_candle)
        if candle['mts'] == candles[market][0]['mts']:
            # Update latest candle
            lst_candles[0] = candle
            candles[market] = lst_candles
        elif candle['mts'] > candles[market][0]['mts']:
            # Insert new (latest) candle
            lst_candles.insert(0, candle)
            candles[market] = lst_candles

この関数は、candlesチャネルからメッセージを受け取り、それを解析して、次のいずれかを実行します。

  • 初期スナップショット:キャンドルのリスト全体を保存します
  • 最新のキャンドルに更新:新しい値をメモリに保存します
  • 新しいキャンドルの追加:新しいキャンドルエントリを作成し、メモリに挿入します

2つの入れ子関数でtruncate_marketあり、parse_candleマーケットシンボルを取得し、個々のキャンドルを操作しやすい形式に解析するのに役立ちます。

少ないコードが望ましい

on_message()チュートリアルのスクリプトで関数全体がどのように表示されるかを次に示します。

def on_message(ws, message):
    global channels, balances, tickers
    data = json.loads(message)
    # Handle events
    if 'event' in data:
        if data['event'] == 'info':
            pass # ignore info messages
        elif data['event'] == 'auth':
            if data['status'] == 'OK':
                print('API authentication successful')
            else:
                print(data['status'])
        # Capture all subscribed channels
        elif data['event'] == 'subscribed':
            if data['channel'] == 'ticker':
                channels[data['chanId']] = [data['channel'], data['pair']]
            elif data['channel'] == 'candles':
                channels[data['chanId']] = [data['channel'], data['key']]
    # Handle channel data
    else:
        chan_id = data[0]
        if chan_id in channels:
            if 'ticker' in channels[chan_id]:
                # if channel is for ticker
                if data[1] == 'hb':
                    # Ignore heartbeat messages
                    pass
                else:
                    # parse ticker and save to memory
                    Thread(target=update_tickers, args=(data,)).start()
            elif 'candles' in channels[chan_id]:
                # if channel is for candles
                if data[1] == 'hb':
                    # Ignore heartbeat messages
                    pass
                else:
                    # parse candle update and save to memory
                    Thread(target=update_candles, args=(data,)).start()

ろうそくやティッカーの解析などのプロセスをこの関数から取り除くことで、スクリプトはメッセージをすばやく処理できます。ここでの一般的なガイドラインは、コードを可能な限り軽量に保つことです。このような技術的分析や戦略の実装など、他の計算は、すべての参照主な変数(例えば、別のスレッドにする必要がありますtickersし、candles多くの異なるスレッドによって頻繁かつリアルタイムに更新されているこのチュートリアルでは)、のWebSocketメッセージをオフ生み出しました。

このチュートリアルのコードをさらに最適化するには、on_message()関数で見つかったコードのほとんどを、独自の新しいスレッドで開いた別の関数に委任します。メッセージの内容の種類の確認、チャネルIDの抽出などは、別の機能で処理できます。簡単にするために、この例では実装していませんが、可能です。

複数のティッカーチャネルを購読する

このチュートリアルのスクリプトを構成する残りのコードには、次のものが含まれます。

def on_open(ws):
    print('API connected')
    for sym in symbols:
        sub_tickers = {
            'event': 'subscribe',
            'channel': 'ticker',
            'symbol': sym
        }
        ws.send(json.dumps(sub_tickers))
        sub_candles = {
            'event': 'subscribe',
            'channel': 'candles',
            'key': 'trade:15m:t' + sym
        }
        ws.send(json.dumps(sub_candles))
    # start printing the books
    Thread(target=print_details).start()

これは、選択されたすべてのシンボルのティッカーとシンボルをサブスクライブします。例として、このチュートリアルに添付されているスクリプトは、ビットフィネックス取引所で利用可能なすべてのシンボルのリストを取得し、結果をフィルタリングして米ドルのペアのみを取得します。このためのコードは以下のとおりです。

# load USD tickers
res = requests.get("https://api.bitfinex.com/v1/symbols")
all_sym = json.loads(res.content)
for x in all_sym:
    if "usd" in x:
        symbols.append(x.upper())
print('Found (%s) USD symbols' %(len(symbols)))

インタラクティブツール

バックグラウンドで実行されているインタラクティブ機能があり、シンボルの最新のティッカー情報とキャンドルデータを照会するために使用できます。コードは次のとおりです。

def print_details():
    # interactive function to view tickers and candles
    while len(tickers) == 0 or len(candles) == 0:
        # wait for tickers to populate
        time.sleep(1)
    print('Tickers and candles loaded. You may query a symbol now.')
    while True:
        symbol = input()
        symbol = symbol.upper()
        if symbol not in symbols:
            print('%s not in list of symbols.' %(symbol))
            continue
        details = tickers[symbol]
        print('%s:  Bid: %s, Ask: %s, Last Price: %s, Volume: %s'\
            %(symbol, details['bid'], details['ask'],\
            details['last_price'], details['volume']))
        print('%s: currently has (%s) candles, latest candle: %s'\
            %(symbol, len(candles[symbol]), str(candles[symbol][0])))

スクリプトを初期化するために、これが最後に追加されます。

# initialize api connection
connect_api()

スクリプトの実行

print_details()関数があるため、スクリプトはインタラクティブです。すべてのデータがロードされたら、シンボル名(BTCUSDなど)を入力すると、以下に示すように、現在のティッカーと最新のキャンドルデータが出力されます。

screencast(1).gif

インタラクティブツールの使用

  • スクリプトを実行した後、スクリプトが次のようなステートメントを出力するのを待ちます Tickers and candles loaded. You may query a symbol now.
  • 記号名を入力してEnterキーを押します。例:btcusd(大文字と小文字は区別されません)またはltcusd
  • 現在のティッカーの詳細と最新のキャンドルが印刷されます

コメント

タイトルとURLをコピーしました