This example shows how to listen on a websocket for updates to Hypothesis.
Last active
September 20, 2021 14:48
-
-
Save judell/e85a3d8e7a23c247f672aaf95b6c3da9 to your computer and use it in GitHub Desktop.
minimal hypothesis websocket client for python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import ssl | |
import uuid | |
import json | |
import certifi | |
import websockets | |
import traceback | |
def _ssl_context(verify=True): | |
ssl_context = ssl.create_default_context(cafile=certifi.where()) | |
if not verify: | |
ssl_context.check_hostname = False | |
ssl_context.verify_mode = ssl.CERT_NONE | |
return ssl_context | |
def main(): | |
async def hello(uri): | |
async with websockets.connect(uri, ssl=_ssl_context(verify=True)) as ws: | |
try: | |
msg = json.dumps({'type': 'whoami','id': 1}) # expect: {"reply_to": 1, "type": "whoyouare", "userid": "acct:[email protected]", "ok": true} | |
await ws.send(msg) | |
msg = json.dumps({"filter":{"match_policy":"include_any","clauses":[{"field":"/group","operator":"one_of","value":["__world__"],}],"actions":{"create":True,"update":True,"delete":True}}}) | |
await ws.send(msg) | |
while True: | |
print ('waiting') | |
print(await(ws.recv())) | |
except: | |
print ('exception! closing socket') | |
ws.close() | |
raise | |
while True: | |
try: | |
asyncio.get_event_loop().run_until_complete( | |
hello('wss://hypothes.is/ws?access_token=...')) | |
except: | |
traceback.print_exc() | |
continue | |
if __name__ == '__main__': | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment