Se me ocurren algunos enfoques.
-
Deje que el hardware capte todo el tráfico entrante y saliente y filtre los pedidos y las actualizaciones de pedidos. Esto tiene la ventaja de que no debería conllevar ninguna latencia añadida y elimina esta preocupación del código de tu estrategia, pero requiere más hardware, software y trabajo operativo.
-
Capture todos los paquetes entrantes y salientes y procéselos en un proceso independiente. Esto no requiere ningún cambio en el hardware, pero introduce más software cuyo mantenimiento es necesario.
Las opciones anteriores corresponderían aproximadamente a NoopConsumer()
en el código siguiente.
-
(No probado) Compartir memoria utilizando mmap
y añadir sólo los datos que desea compartir de su proceso de estrategia y leer de otro proceso para mantenerse al día.
-
Escribir de forma asíncrona como se hace a continuación, probablemente sea más rápido si la sobrecarga de escribir es mayor que la de cambiar de contexto, lo cual parece probable pero necesita experimentarse.
-
Tener bloqueo escribe, tal vez si se puede escribir muy rápido.
Este código se construyó desde cero hoy y no soy un habitual asyncio
usuario, por lo que probablemente pueda mejorarse. No obstante, creo que es un buen punto de partida.
"""
A simple experiment to time different ways to save orders
Start the servers with
> python main.py server 5
and start the experiment with
> python main.py client 5
Servers can be reused.
Example output:
Connecting to 5 exchanges with sync writer
Connecting to 8080
Connecting to 8081
Connecting to 8082
Connecting to 8083
Connecting to 8084
Took: 5.667398263s
Took: 5.693023433s
Took: 5.707320206s
Took: 5.716781134s
Took: 5.720934913s
Connecting to 5 exchanges with async writer
Connecting to 8080
Connecting to 8081
Connecting to 8082
Connecting to 8083
Connecting to 8084
Took: 5.684973048s
Took: 5.700082285s
Took: 5.709185018s
Took: 5.713764676s
Took: 5.721333215s
Connecting to 5 exchanges with NoopConsumer
Connecting to 8080
Connecting to 8081
Connecting to 8082
Connecting to 8083
Connecting to 8084
Took: 2.466461159s
Took: 2.553055324s
Took: 2.563424393s
Took: 2.563971055s
Took: 2.60299299s
Connecting to 5 exchanges with SqliteConsumer synced writes
Connecting to 8080
Connecting to 8081
Connecting to 8082
Connecting to 8083
Connecting to 8084
Took: 10.003863398s
Took: 10.026773816s
Took: 10.058823117s
Took: 10.08572986s
Took: 10.12419061s
Connecting to 5 exchanges with SqliteConsumer async writes
Connecting to 8080
Connecting to 8081
Connecting to 8082
Connecting to 8083
Connecting to 8084
Took: 9.547965043s
Took: 9.583894856s
Took: 9.603546447s
Took: 9.623569694s
Took: 9.648144184s
"""
import asyncio
import sqlite3
import sys
import time
import numpy as np
from websockets.server import serve
from websockets import connect
DEBUG = False
# Server
hostname = "localhost"
base_port = 8080
average_wait_time = 2
num_orders = 1000
p = 0.5
speed = 1000
seed = 42
# Client
sync_overhead = 0.002
async_overhead = 0.002
np.random.seed(seed)
def print_settings():
print(
f"""Settings:
hostname: {hostname}
base_port: {base_port}
num_orders: {num_orders}
p: {p}
speed: {speed}
seed: {seed}"""
)
class NoopConsumer:
"""Takes a message and does nothing"""
def write(self, exchange_id, order_id):
pass
class Consumer:
"""Consumes and writes a message and adds some overhead"""
def __init__(self, overhead):
self.overhead = overhead
self.messages_received = []
def write(self, exchange_id, order_id):
self.messages_received.append(f"{exchange_id}|{order_id}")
time.sleep(self.overhead)
async def async_write(self, exchange_id, order_id):
self.messages_received.append(f"{exchange_id}|{order_id}")
time.sleep(self.overhead)
class SqliteConsumer:
"""Non-thread safe writer to a sqlite3 in 'test_db.sqlite3'"""
def __init__(self):
self.con = sqlite3.connect("test_db.sqlite3")
self.cur = self.con.cursor()
self.cur.execute("DROP TABLE IF EXISTS orders")
self.cur.execute("CREATE TABLE orders(exchange_id int, order_id int)")
def write(self, exchange_id, order_id):
self.cur.execute(
f"INSERT INTO orders (exchange_id, order_id) VALUES ({exchange_id}, {order_id})"
)
self.con.commit()
async def async_write(self, exchange_id, order_id):
self.cur.execute(
f"INSERT INTO orders (exchange_id, order_id) VALUES ({exchange_id}, {order_id})"
)
self.con.commit()
class Exchange:
"""
Bare functionality to model an exchange
An exchange listens on hostname:port and sends exchange_id:order_id over a
websocket at random intervals.
"""
def __init__(self, exchange_id, hostname, port, average_wait_time):
self.exchange_id = exchange_id
self.hostname = hostname
self.port = port
self.wait_times = np.random.poisson(average_wait_time, num_orders)
print(f"Creating {self.exchange_id} on {self.hostname}:{self.port}")
print(f"Average wait time param: {average_wait_time}")
print(f"Average wait time: {self.wait_times.sum() / (speed * num_orders)}")
print(f"Total wait time: {self.wait_times.sum() / speed}")
async def order_feed(self, websocket):
async for message in websocket:
if message == "start":
start = time.time_ns()
send_time = 0
for i, wait_time in enumerate(self.wait_times):
await asyncio.sleep(wait_time / speed)
start_send = time.time_ns()
await websocket.send(f"{self.exchange_id}:{i}")
send_time += time.time_ns() - start_send
await websocket.send("done")
print(
f"Took {(time.time_ns() - start) / 1e9}s to send all orders on {self.exchange_id}"
)
print(f"Total send time: {send_time / 1e9}s")
print_settings()
async def run(self):
async with serve(self.order_feed, self.hostname, self.port):
await asyncio.Future()
class Client:
"""Connects to an exchange and records some results
The feed is started with 'start' and stopped when the message 'done' is
recieved. A coin flip is performed to decide whether the message is saved.
"""
def __init__(self, port, writer, write_async):
self.port = port
self.writer = writer
self.write_async = write_async
async def run(self):
print(f"Connecting to {self.port}")
async with connect(f"ws://{hostname}:{self.port}") as websocket:
await websocket.send("start")
start = time.time_ns()
async for message in websocket:
if message == "done":
break
if np.random.choice([True, False]):
exchange_id, order_id = message.split(":")
if self.write_async:
await self.writer.async_write(exchange_id, order_id)
else:
self.writer.write(exchange_id, order_id)
print(f"Took: {(time.time_ns() - start) / 1e9}s")
class Strategy:
"""A strategy holds mulitple connections"""
def __init__(self, clients):
self.clients = clients
async def run_all(self):
async with asyncio.TaskGroup() as tg:
for client in self.clients:
tg.create_task(client.run())
if __name__ == "__main__":
if len(sys.argv) < 3:
print("Provide either 'server' or 'client' as argument and a count")
else:
arg = sys.argv[1]
num_servers = int(sys.argv[2])
if arg == "server":
print_settings()
async def run_servers(num_servers):
async with asyncio.TaskGroup() as tg:
for i in range(num_servers):
exchange = Exchange(
i + 1, hostname, base_port + i, average_wait_time
)
tg.create_task(exchange.run())
asyncio.run(run_servers(num_servers))
elif arg == "client":
print(f"Connecting to {num_servers} exchanges with sync writer")
clients = [
Client(base_port + i, Consumer(sync_overhead), False)
for i in range(num_servers)
]
asyncio.run(Strategy(clients).run_all())
if DEBUG:
print("\n".join(strategy.writer.messages_received))
print(f"Connecting to {num_servers} exchanges with async writer")
clients = [
Client(base_port + i, Consumer(async_overhead), True)
for i in range(num_servers)
]
asyncio.run(Strategy(clients).run_all())
if DEBUG:
print("\n".join(strategy.writer.messages_received))
print(f"Connecting to {num_servers} exchanges with NoopConsumer")
clients = [
Client(base_port + i, NoopConsumer(), False) for i in range(num_servers)
]
asyncio.run(Strategy(clients).run_all())
if DEBUG:
print("\n".join(strategy.writer.messages_received))
print(
f"Connecting to {num_servers} exchanges with SqliteConsumer synced writes"
)
clients = [
Client(base_port + i, SqliteConsumer(), False)
for i in range(num_servers)
]
asyncio.run(Strategy(clients).run_all())
print(
f"Connecting to {num_servers} exchanges with SqliteConsumer async writes"
)
clients = [
Client(base_port + i, SqliteConsumer(), True)
for i in range(num_servers)
]
asyncio.run(Strategy(clients).run_all())
else:
print(f"Unknown arg '{arg}', exiting")