Created
November 17, 2021 22:34
-
-
Save whittlem/c1e8021f2575da6eca5a348130647e59 to your computer and use it in GitHub Desktop.
rich-eodhistoricaldata-websocket
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def main() -> None: | |
"""Main""" | |
with Live(layout, screen=True, redirect_stderr=False) as live: | |
websocket = WebSocketClient( | |
# Demo API key for testing purposes | |
api_key="<removed>", | |
endpoint="crypto", | |
symbols=symbol_list, | |
) | |
websocket.start() | |
message_count = 0 | |
while True: | |
if websocket: | |
if message_count != websocket.message_count: | |
if "s" in websocket.message and "p" in websocket.message: | |
if websocket.message["s"] in symbol_dict: | |
if symbol_dict[websocket.message["s"]][0] == 0: | |
symbol_dict[websocket.message["s"]] = [ | |
float(websocket.message["p"]), | |
0, # no move in price | |
] | |
elif float(symbol_dict[websocket.message["s"]][0]) > float( | |
websocket.message["p"] | |
): | |
symbol_dict[websocket.message["s"]] = [ | |
round(float(websocket.message["p"]), 2), | |
1, # price went up | |
] | |
elif float(symbol_dict[websocket.message["s"]][0]) < float( | |
websocket.message["p"] | |
): | |
symbol_dict[websocket.message["s"]] = [ | |
round(float(websocket.message["p"]), 2), | |
-1, # price went down | |
] | |
if float(symbol_dict[websocket.message["s"]][1]) == 1: | |
layout[websocket.message["s"]].update( | |
Text( | |
f"{websocket.message['s']}\n{round(float(symbol_dict[websocket.message['s']][0]), 2)}", | |
style="bold white on dark_green", | |
justify="center", | |
) | |
) | |
elif float(symbol_dict[websocket.message["s"]][1]) == -1: | |
layout[websocket.message["s"]].update( | |
Text( | |
f"{websocket.message['s']}\n{round(float(symbol_dict[websocket.message['s']][0]), 2)}", | |
style="bold white on red", | |
justify="center", | |
) | |
) | |
else: | |
layout[websocket.message["s"]].update( | |
Text( | |
f"{websocket.message['s']}\n{round(float(symbol_dict[websocket.message['s']][0]), 2)}", | |
style="bold black on white", | |
justify="center", | |
) | |
) | |
message_count = websocket.message_count | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment