2 votos

¿Eficaz manera de persistir estrategia de órdenes en tiempo real a la base de datos?

Antecedentes: Estoy construyendo un sistema de trading para el mercado de criptomonedas con Python, y actualmente tengo problemas sobre cómo guardar efectivamente mis órdenes/trades en tiempo real en el disco, para poder monitorear más fácilmente o hacer análisis posteriores.

El diseño actual es el siguiente:

Para cada activo tengo una clase Strategy, con una clase Position vinculada a ella. La estrategia gestiona una conexión websocket a la bolsa, y cuando la estrategia recibe cualquier actualización de la orden, actualizará su posición. El fragmento de código simple es como:

class Position():
    def __init__():
        self.orders = {}
        ...

    def update(self, order_info):
        ... # process position
        self.orders[order_info['id']] = order_info # update 

class Strategy():
    def __init__():
        self.name = ''
        self.position = Position()

    def on_order_update(self, order_info):
        self.position.update(order_info)

Ahora, todas las órdenes se guardan en memoria en la clase de posición de cada estrategia, y quiero guardarlas también en disco (¿quizás una base de datos?). Lo que tengo en mente es simplemente escribir el estado de la orden en una base de datos cada vez que la estrategia la reciba. Por lo tanto iniciaría una conexión db al inicializar la clase position, y añadiría unas pocas líneas para escribir en la base de datos en su método update.

Sin embargo, me pregunto si hay mejores formas de conseguirlo, y sobre todo me preocupa el rendimiento del sistema de negociación. Estoy haciendo sobre todo el comercio intradía y yo esperaría que cada estrategia para tener alrededor de 1 a 2 órdenes por segundo, y el comercio en un máximo de 100 activos a través de intercambios al mismo tiempo.

Mi enfoque mencionado anteriormente crearía y gestionaría un montón de conexiones a la base de datos (también tendría que ocuparse de las reconexiones). Además, la escritura se ejecuta en el mismo proceso de la estrategia, por lo que podría paralizar la estrategia si la escritura tarda mucho tiempo. Dado que no necesito que los datos de las órdenes estén disponibles instantáneamente (un retraso de segundos no es realmente un problema), ¿tal vez es una mejor idea tener otro proceso para leer la información de las órdenes de todas las estrategias y escribirlas periódicamente en la base de datos? ¿O almacenar en caché los datos en redis primero y luego persistir en el disco?

Pregunta: ¿Cuál es el enfoque más eficaz para mi caso?

Descargo de responsabilidad: Tengo conocimientos muy limitados sobre ingeniería de sistemas comerciales, por lo que cualquier sugerencia o indicación será más que bienvenida.

2voto

Charles Chen Puntos 183

Se me ocurren algunos enfoques.

  1. Deje que el hardware capte todo el tráfico entrante y saliente y filtre los pedidos y las actualizaciones de pedidos. Esto tiene la ventaja de que no debería conllevar ninguna latencia añadida y elimina esta preocupación del código de tu estrategia, pero requiere más hardware, software y trabajo operativo.

  2. Capture todos los paquetes entrantes y salientes y procéselos en un proceso independiente. Esto no requiere ningún cambio en el hardware, pero introduce más software cuyo mantenimiento es necesario.

Las opciones anteriores corresponderían aproximadamente a NoopConsumer() en el código siguiente.

  1. (No probado) Compartir memoria utilizando mmap y añadir sólo los datos que desea compartir de su proceso de estrategia y leer de otro proceso para mantenerse al día.

  2. Escribir de forma asíncrona como se hace a continuación, probablemente sea más rápido si la sobrecarga de escribir es mayor que la de cambiar de contexto, lo cual parece probable pero necesita experimentarse.

  3. Tener bloqueo escribe, tal vez si se puede escribir muy rápido.

Este código se construyó desde cero hoy y no soy un habitual asyncio usuario, por lo que probablemente pueda mejorarse. No obstante, creo que es un buen punto de partida.

"""
A simple experiment to time different ways to save orders

Start the servers with

> python main.py server 5

and start the experiment with

> python main.py client 5

Servers can be reused.

Example output:

Connecting to 5 exchanges with sync writer
Connecting to 8080
Connecting to 8081
Connecting to 8082
Connecting to 8083
Connecting to 8084
Took: 5.667398263s
Took: 5.693023433s
Took: 5.707320206s
Took: 5.716781134s
Took: 5.720934913s
Connecting to 5 exchanges with async writer
Connecting to 8080
Connecting to 8081
Connecting to 8082
Connecting to 8083
Connecting to 8084
Took: 5.684973048s
Took: 5.700082285s
Took: 5.709185018s
Took: 5.713764676s
Took: 5.721333215s
Connecting to 5 exchanges with NoopConsumer
Connecting to 8080
Connecting to 8081
Connecting to 8082
Connecting to 8083
Connecting to 8084
Took: 2.466461159s
Took: 2.553055324s
Took: 2.563424393s
Took: 2.563971055s
Took: 2.60299299s
Connecting to 5 exchanges with SqliteConsumer synced writes
Connecting to 8080
Connecting to 8081
Connecting to 8082
Connecting to 8083
Connecting to 8084
Took: 10.003863398s
Took: 10.026773816s
Took: 10.058823117s
Took: 10.08572986s
Took: 10.12419061s
Connecting to 5 exchanges with SqliteConsumer async writes
Connecting to 8080
Connecting to 8081
Connecting to 8082
Connecting to 8083
Connecting to 8084
Took: 9.547965043s
Took: 9.583894856s
Took: 9.603546447s
Took: 9.623569694s
Took: 9.648144184s
"""

import asyncio
import sqlite3
import sys
import time
import numpy as np
from websockets.server import serve
from websockets import connect

DEBUG = False
# Server
hostname = "localhost"
base_port = 8080
average_wait_time = 2
num_orders = 1000
p = 0.5
speed = 1000
seed = 42

# Client
sync_overhead = 0.002
async_overhead = 0.002

np.random.seed(seed)

def print_settings():
    print(
        f"""Settings:
hostname:       {hostname}
base_port:      {base_port}
num_orders:     {num_orders}
p:              {p}
speed:          {speed}
seed:           {seed}"""
    )

class NoopConsumer:
    """Takes a message and does nothing"""

    def write(self, exchange_id, order_id):
        pass

class Consumer:
    """Consumes and writes a message and adds some overhead"""

    def __init__(self, overhead):
        self.overhead = overhead
        self.messages_received = []

    def write(self, exchange_id, order_id):
        self.messages_received.append(f"{exchange_id}|{order_id}")
        time.sleep(self.overhead)

    async def async_write(self, exchange_id, order_id):
        self.messages_received.append(f"{exchange_id}|{order_id}")
        time.sleep(self.overhead)

class SqliteConsumer:
    """Non-thread safe writer to a sqlite3 in 'test_db.sqlite3'"""

    def __init__(self):
        self.con = sqlite3.connect("test_db.sqlite3")
        self.cur = self.con.cursor()
        self.cur.execute("DROP TABLE IF EXISTS orders")
        self.cur.execute("CREATE TABLE orders(exchange_id int, order_id int)")

    def write(self, exchange_id, order_id):
        self.cur.execute(
            f"INSERT INTO orders (exchange_id, order_id) VALUES ({exchange_id}, {order_id})"
        )
        self.con.commit()

    async def async_write(self, exchange_id, order_id):
        self.cur.execute(
            f"INSERT INTO orders (exchange_id, order_id) VALUES ({exchange_id}, {order_id})"
        )
        self.con.commit()

class Exchange:
    """
    Bare functionality to model an exchange

    An exchange listens on hostname:port and sends exchange_id:order_id over a
    websocket at random intervals.
    """

    def __init__(self, exchange_id, hostname, port, average_wait_time):
        self.exchange_id = exchange_id
        self.hostname = hostname
        self.port = port
        self.wait_times = np.random.poisson(average_wait_time, num_orders)
        print(f"Creating {self.exchange_id} on {self.hostname}:{self.port}")
        print(f"Average wait time param: {average_wait_time}")
        print(f"Average wait time: {self.wait_times.sum() / (speed * num_orders)}")
        print(f"Total wait time: {self.wait_times.sum() / speed}")

    async def order_feed(self, websocket):
        async for message in websocket:
            if message == "start":
                start = time.time_ns()
                send_time = 0
                for i, wait_time in enumerate(self.wait_times):
                    await asyncio.sleep(wait_time / speed)
                    start_send = time.time_ns()
                    await websocket.send(f"{self.exchange_id}:{i}")
                    send_time += time.time_ns() - start_send
                await websocket.send("done")
                print(
                    f"Took {(time.time_ns() - start) / 1e9}s to send all orders on {self.exchange_id}"
                )
                print(f"Total send time: {send_time / 1e9}s")
                print_settings()

    async def run(self):
        async with serve(self.order_feed, self.hostname, self.port):
            await asyncio.Future()

class Client:
    """Connects to an exchange and records some results

    The feed is started with 'start' and stopped when the message 'done' is
    recieved. A coin flip is performed to decide whether the message is saved.
    """

    def __init__(self, port, writer, write_async):
        self.port = port
        self.writer = writer
        self.write_async = write_async

    async def run(self):
        print(f"Connecting to {self.port}")
        async with connect(f"ws://{hostname}:{self.port}") as websocket:
            await websocket.send("start")
            start = time.time_ns()
            async for message in websocket:
                if message == "done":
                    break
                if np.random.choice([True, False]):
                    exchange_id, order_id = message.split(":")
                    if self.write_async:
                        await self.writer.async_write(exchange_id, order_id)
                    else:
                        self.writer.write(exchange_id, order_id)
            print(f"Took: {(time.time_ns() - start) / 1e9}s")

class Strategy:
    """A strategy holds mulitple connections"""

    def __init__(self, clients):
        self.clients = clients

    async def run_all(self):
        async with asyncio.TaskGroup() as tg:
            for client in self.clients:
                tg.create_task(client.run())

if __name__ == "__main__":
    if len(sys.argv) < 3:
        print("Provide either 'server' or 'client' as argument and a count")
    else:
        arg = sys.argv[1]
        num_servers = int(sys.argv[2])

        if arg == "server":
            print_settings()

            async def run_servers(num_servers):
                async with asyncio.TaskGroup() as tg:
                    for i in range(num_servers):
                        exchange = Exchange(
                            i + 1, hostname, base_port + i, average_wait_time
                        )
                        tg.create_task(exchange.run())

            asyncio.run(run_servers(num_servers))
        elif arg == "client":
            print(f"Connecting to {num_servers} exchanges with sync writer")
            clients = [
                Client(base_port + i, Consumer(sync_overhead), False)
                for i in range(num_servers)
            ]
            asyncio.run(Strategy(clients).run_all())
            if DEBUG:
                print("\n".join(strategy.writer.messages_received))

            print(f"Connecting to {num_servers} exchanges with async writer")
            clients = [
                Client(base_port + i, Consumer(async_overhead), True)
                for i in range(num_servers)
            ]
            asyncio.run(Strategy(clients).run_all())
            if DEBUG:
                print("\n".join(strategy.writer.messages_received))

            print(f"Connecting to {num_servers} exchanges with NoopConsumer")
            clients = [
                Client(base_port + i, NoopConsumer(), False) for i in range(num_servers)
            ]
            asyncio.run(Strategy(clients).run_all())
            if DEBUG:
                print("\n".join(strategy.writer.messages_received))

            print(
                f"Connecting to {num_servers} exchanges with SqliteConsumer synced writes"
            )
            clients = [
                Client(base_port + i, SqliteConsumer(), False)
                for i in range(num_servers)
            ]
            asyncio.run(Strategy(clients).run_all())

            print(
                f"Connecting to {num_servers} exchanges with SqliteConsumer async writes"
            )
            clients = [
                Client(base_port + i, SqliteConsumer(), True)
                for i in range(num_servers)
            ]
            asyncio.run(Strategy(clients).run_all())

        else:
            print(f"Unknown arg '{arg}', exiting")

1voto

Katie Puntos 6

Si me permites una sugerencia, QuestDB es una base de datos de series temporales creada exactamente para estos casos de uso, con muchos usuarios en el sector financiero. La ingesta de datos se puede hacer usando cualquier librería compatible con postgresql pero no es realmente eficiente para altas tasas. Para frecuencias más altas, se pueden ingerir datos utilizando el protocolo ILP, que está disponible a través de una librería cliente nativo Python .

Un ejemplo para la ingestión:

from questdb.ingress import Sender

with Sender('localhost', 9009) as sender:
    sender.row(
        'trades',
        symbols={'symbol': 'ETH-USD', 'side': 'buy'},
        columns={'prices': 2615.54 'volume': 0.00044})
    sender.flush()

Las consultas pueden ejecutarse directamente utilizando cualquier biblioteca compatible con postgresql, como psycopg o sqlalchemy. Un ejemplo de gráfico de velas a intervalos de 15 m podría ser

SELECT 
    timestamp,
    first(price) AS open,
    last(price) AS close,
    min(price),
    max(price),
    sum(amount) AS volume
FROM trades
WHERE symbol = 'BTC-USD' AND timestamp > dateadd('d', -1, now())
SAMPLE BY 15m ALIGN TO CALENDAR;

Este ejemplo puede ejecutarse en la instancia de demostración pública https://demo.questdb.io utilizando el conjunto de datos de operaciones públicas, que se actualiza con datos de coinbase cada segundo aproximadamente.

Puede utilizar QuestDB de forma autoalojada (licencia apache 2.0) o como un servicio gestionado utilizando la licencia Nube QuestDB (con un nivel gratuito de 200 $).

Finanhelp.com

FinanHelp es una comunidad para personas con conocimientos de economía y finanzas, o quiere aprender. Puedes hacer tus propias preguntas o resolver las de los demás.

Powered by:

X