Skip to content

Instantly share code, notes, and snippets.

@dankrause
Last active July 8, 2024 02:05
Show Gist options
  • Save dankrause/6000248 to your computer and use it in GitHub Desktop.
Save dankrause/6000248 to your computer and use it in GitHub Desktop.
Tiny python SSDP discovery library with no external dependencies
# Copyright 2014 Dan Krause
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import socket
import httplib
import StringIO
class SSDPResponse(object):
class _FakeSocket(StringIO.StringIO):
def makefile(self, *args, **kw):
return self
def __init__(self, response):
r = httplib.HTTPResponse(self._FakeSocket(response))
r.begin()
self.location = r.getheader("location")
self.usn = r.getheader("usn")
self.st = r.getheader("st")
self.cache = r.getheader("cache-control").split("=")[1]
def __repr__(self):
return "<SSDPResponse({location}, {st}, {usn})>".format(**self.__dict__)
def discover(service, timeout=5, retries=1, mx=3):
group = ("239.255.255.250", 1900)
message = "\r\n".join([
'M-SEARCH * HTTP/1.1',
'HOST: {0}:{1}',
'MAN: "ssdp:discover"',
'ST: {st}','MX: {mx}','',''])
socket.setdefaulttimeout(timeout)
responses = {}
for _ in range(retries):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
sock.sendto(message.format(*group, st=service, mx=mx), group)
while True:
try:
response = SSDPResponse(sock.recv(1024))
responses[response.location] = response
except socket.timeout:
break
return responses.values()
# Example:
# import ssdp
# ssdp.discover("roku:ecp")
@JJC1138
Copy link

JJC1138 commented Feb 1, 2016

Here's the Swift port I mentioned in case it's of any use to anyone.

@mraineri
Copy link

@JJC1138 - I've made the same observation myself; I think it would be better if the default timeout were set to something like 5 to accommodate the potential worst case response delay.

Copy link

ghost commented Dec 10, 2016

Python 3 version:

From f81513300e2233d3d2a42fa90dcd60fb52351149 Mon Sep 17 00:00:00 2001
From: Adam Baxter <[email protected]>
Date: Sat, 10 Dec 2016 15:44:49 +1100
Subject: [PATCH] Python 3 version

---
 ssdp.py | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)

diff --git a/ssdp.py b/ssdp.py
index acd330a..4a96bf7 100644
--- a/ssdp.py
+++ b/ssdp.py
@@ -1,4 +1,4 @@
-#   Copyright 2014 Dan Krause
+#   Copyright 2014 Dan Krause, Python 3 hack 2016 Adam Baxter
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   you may not use this file except in compliance with the License.
@@ -13,15 +13,15 @@
 #   limitations under the License.
 
 import socket
-import httplib
-import StringIO
+import http.client
+import io
 
 class SSDPResponse(object):
-    class _FakeSocket(StringIO.StringIO):
+    class _FakeSocket(io.BytesIO):
         def makefile(self, *args, **kw):
             return self
     def __init__(self, response):
-        r = httplib.HTTPResponse(self._FakeSocket(response))
+        r = http.client.HTTPResponse(self._FakeSocket(response))
         r.begin()
         self.location = r.getheader("location")
         self.usn = r.getheader("usn")
@@ -43,14 +43,16 @@ def discover(service, timeout=5, retries=1, mx=3):
         sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
         sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
-        sock.sendto(message.format(*group, st=service, mx=mx), group)
+        message_bytes = message.format(*group, st=service, mx=mx).encode('utf-8')
+        sock.sendto(message_bytes, group)
+
         while True:
             try:
                 response = SSDPResponse(sock.recv(1024))
                 responses[response.location] = response
             except socket.timeout:
                 break
-    return responses.values()
+    return list(responses.values())
 
 # Example:
 # import ssdp
-- 
2.10.1.windows.1

@rexxy-sasori
Copy link

I use your code, but it keeps timing out. Could you please give any advice for debugging? Im using ubuntu trying to discover a sony alpha 6000 camera

@pedrinho
Copy link

Thanks for this code. Using it on https://github.com/pedrinho/samsung_remote to find Samsung TV's on network and works like a charme!

@jnk5y
Copy link

jnk5y commented Aug 9, 2017

I've run this successfully on a Fedora machine and can't get it to find any rokus on an Ubuntu machine. Both on the same network both wirelessly connected. Don't understand why it would discover rokus on one and not the other. Anyone else had this issue?

@George-ou812
Copy link

Hi,

Great bit of code. I've been chasing a real gssdp / gupnp problem and are wondering how easy it would be to turn this code into a M-SEARCH listener? I need to prove that my Linux gssdp/gupnp libraries are actually doing their job. Rygel which uses theses libs seems to miss a lot of SSDP searches from various apps like iPad MConnect. Big question is that do the python libs use the same libraries as a bottom end.

Thanks,

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment