Se me ocurren algunos enfoques.
Deje que el hardware capte todo el tráfico entrante y saliente y filtre los pedidos y las actualizaciones de pedidos. Esto tiene la ventaja de que no debería conllevar ninguna latencia añadida y elimina esta preocupación del código de tu estrategia, pero requiere más hardware, software y trabajo operativo.
Capture todos los paquetes entrantes y salientes y procéselos en un proceso independiente. Esto no requiere ningún cambio en el hardware, pero introduce más software cuyo mantenimiento es necesario.
Las opciones anteriores corresponderían aproximadamente a NoopConsumer()
en el código siguiente.
(No probado) Compartir memoria utilizando mmap
y añadir sólo los datos que desea compartir de su proceso de estrategia y leer de otro proceso para mantenerse al día.
Escribir de forma asíncrona como se hace a continuación, probablemente sea más rápido si la sobrecarga de escribir es mayor que la de cambiar de contexto, lo cual parece probable pero necesita experimentarse.
Tener bloqueo escribe, tal vez si se puede escribir muy rápido.
Este código se construyó desde cero hoy y no soy un habitual asyncio
usuario, por lo que probablemente pueda mejorarse. No obstante, creo que es un buen punto de partida.
A simple experiment to time different ways to save orders
Start the servers with
> python server 5
and start the experiment with
> python client 5
Servers can be reused.
Example output:
Connecting to 5 exchanges with sync writer
Connecting to 8080
Connecting to 8081
Connecting to 8082
Connecting to 8083
Connecting to 8084
Took: 5.667398263s
Took: 5.693023433s
Took: 5.707320206s
Took: 5.716781134s
Took: 5.720934913s
Connecting to 5 exchanges with async writer
Connecting to 8080
Connecting to 8081
Connecting to 8082
Connecting to 8083
Connecting to 8084
Took: 5.684973048s
Took: 5.700082285s
Took: 5.709185018s
Took: 5.713764676s
Took: 5.721333215s
Connecting to 5 exchanges with NoopConsumer
Connecting to 8080
Connecting to 8081
Connecting to 8082
Connecting to 8083
Connecting to 8084
Took: 2.466461159s
Took: 2.553055324s
Took: 2.563424393s
Took: 2.563971055s
Took: 2.60299299s
Connecting to 5 exchanges with SqliteConsumer synced writes
Connecting to 8080
Connecting to 8081
Connecting to 8082
Connecting to 8083
Connecting to 8084
Took: 10.003863398s
Took: 10.026773816s
Took: 10.058823117s
Took: 10.08572986s
Took: 10.12419061s
Connecting to 5 exchanges with SqliteConsumer async writes
Connecting to 8080
Connecting to 8081
Connecting to 8082
Connecting to 8083
Connecting to 8084
Took: 9.547965043s
Took: 9.583894856s
Took: 9.603546447s
Took: 9.623569694s
Took: 9.648144184s
import asyncio
import sqlite3
import sys
import time
import numpy as np
from websockets.server import serve
from websockets import connect
DEBUG = False
# Server
hostname = "localhost"
base_port = 8080
average_wait_time = 2
num_orders = 1000
p = 0.5
speed = 1000
seed = 42
# Client
sync_overhead = 0.002
async_overhead = 0.002
def print_settings():
hostname: {hostname}
base_port: {base_port}
num_orders: {num_orders}
p: {p}
speed: {speed}
seed: {seed}"""
class NoopConsumer:
"""Takes a message and does nothing"""
def write(self, exchange_id, order_id):
class Consumer:
"""Consumes and writes a message and adds some overhead"""
def __init__(self, overhead):
self.overhead = overhead
self.messages_received = []
def write(self, exchange_id, order_id):
async def async_write(self, exchange_id, order_id):
class SqliteConsumer:
"""Non-thread safe writer to a sqlite3 in 'test_db.sqlite3'"""
def __init__(self):
self.con = sqlite3.connect("test_db.sqlite3")
self.cur = self.con.cursor()
self.cur.execute("DROP TABLE IF EXISTS orders")
self.cur.execute("CREATE TABLE orders(exchange_id int, order_id int)")
def write(self, exchange_id, order_id):
f"INSERT INTO orders (exchange_id, order_id) VALUES ({exchange_id}, {order_id})"
async def async_write(self, exchange_id, order_id):
f"INSERT INTO orders (exchange_id, order_id) VALUES ({exchange_id}, {order_id})"
class Exchange:
Bare functionality to model an exchange
An exchange listens on hostname:port and sends exchange_id:order_id over a
websocket at random intervals.
def __init__(self, exchange_id, hostname, port, average_wait_time):
self.exchange_id = exchange_id
self.hostname = hostname
self.port = port
self.wait_times = np.random.poisson(average_wait_time, num_orders)
print(f"Creating {self.exchange_id} on {self.hostname}:{self.port}")
print(f"Average wait time param: {average_wait_time}")
print(f"Average wait time: {self.wait_times.sum() / (speed * num_orders)}")
print(f"Total wait time: {self.wait_times.sum() / speed}")
async def order_feed(self, websocket):
async for message in websocket:
if message == "start":
start = time.time_ns()
send_time = 0
for i, wait_time in enumerate(self.wait_times):
await asyncio.sleep(wait_time / speed)
start_send = time.time_ns()
await websocket.send(f"{self.exchange_id}:{i}")
send_time += time.time_ns() - start_send
await websocket.send("done")
f"Took {(time.time_ns() - start) / 1e9}s to send all orders on {self.exchange_id}"
print(f"Total send time: {send_time / 1e9}s")
async def run(self):
async with serve(self.order_feed, self.hostname, self.port):
await asyncio.Future()
class Client:
"""Connects to an exchange and records some results
The feed is started with 'start' and stopped when the message 'done' is
recieved. A coin flip is performed to decide whether the message is saved.
def __init__(self, port, writer, write_async):
self.port = port
self.writer = writer
self.write_async = write_async
async def run(self):
print(f"Connecting to {self.port}")
async with connect(f"ws://{hostname}:{self.port}") as websocket:
await websocket.send("start")
start = time.time_ns()
async for message in websocket:
if message == "done":
if np.random.choice([True, False]):
exchange_id, order_id = message.split(":")
if self.write_async:
await self.writer.async_write(exchange_id, order_id)
self.writer.write(exchange_id, order_id)
print(f"Took: {(time.time_ns() - start) / 1e9}s")
class Strategy:
"""A strategy holds mulitple connections"""
def __init__(self, clients):
self.clients = clients
async def run_all(self):
async with asyncio.TaskGroup() as tg:
for client in self.clients:
if __name__ == "__main__":
if len(sys.argv) < 3:
print("Provide either 'server' or 'client' as argument and a count")
arg = sys.argv[1]
num_servers = int(sys.argv[2])
if arg == "server":
async def run_servers(num_servers):
async with asyncio.TaskGroup() as tg:
for i in range(num_servers):
exchange = Exchange(
i + 1, hostname, base_port + i, average_wait_time
elif arg == "client":
print(f"Connecting to {num_servers} exchanges with sync writer")
clients = [
Client(base_port + i, Consumer(sync_overhead), False)
for i in range(num_servers)
print(f"Connecting to {num_servers} exchanges with async writer")
clients = [
Client(base_port + i, Consumer(async_overhead), True)
for i in range(num_servers)
print(f"Connecting to {num_servers} exchanges with NoopConsumer")
clients = [
Client(base_port + i, NoopConsumer(), False) for i in range(num_servers)
f"Connecting to {num_servers} exchanges with SqliteConsumer synced writes"
clients = [
Client(base_port + i, SqliteConsumer(), False)
for i in range(num_servers)
f"Connecting to {num_servers} exchanges with SqliteConsumer async writes"
clients = [
Client(base_port + i, SqliteConsumer(), True)
for i in range(num_servers)
print(f"Unknown arg '{arg}', exiting")