Skip to content

Instantly share code, notes, and snippets.

@k-okada
Last active July 22, 2022 11:58
Show Gist options
  • Save k-okada/4d0dabe396a6af37ca0511d40b4e21e4 to your computer and use it in GitHub Desktop.
Save k-okada/4d0dabe396a6af37ca0511d40b4e21e4 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import rospy
import pickle
import base64
from smach_msgs.msg import SmachContainerStatus
import cv2
from cv_bridge import CvBridge
from sensor_msgs.msg import Image
class SmachToMail():
def __init__(self):
rospy.init_node('server_name') # it should be 'smach_to_mail', but 'server_name' is the default name of smach_ros
rospy.Subscriber("~smach/container_status", SmachContainerStatus, self._status_cb)
self.bridge = CvBridge()
def _status_cb(self, msg):
if len(msg.active_states) == 0:
return
status_str = ', '.join(msg.active_states)
local_data_str = pickle.loads(msg.local_data)
info_str = msg.info
print("status -> {}".format(status_str))
print("description_str -> {}".format(local_data_str['DESCRIPTION']))
#print("image_str -> {}".format(local_data_str['IMAGE']))
if local_data_str['IMAGE'] :
msg = Image()
msg.deserialize(base64.b64decode(local_data_str['IMAGE']))
cv_image = self.bridge.imgmsg_to_cv2(msg, msg.encoding)
cv2.imshow("Image", cv_image)
cv2.waitKey(2)
print("info_str -> {}".format(info_str))
stm = SmachToMail()
rospy.spin()
#!/usr/bin/env python
import rospy
import pickle
import base64
from smach_msgs.msg import SmachContainerStatus
import numpy as np
import cv2
from sensor_msgs.msg import CompressedImage
class SmachToMail():
def __init__(self):
rospy.init_node('server_name') # it should be 'smach_to_mail', but 'server_name' is the default name of smach_ros
rospy.Subscriber("~smach/container_status", SmachContainerStatus, self._status_cb)
self.cv_image = None
def _status_cb(self, msg):
if len(msg.active_states) == 0:
return
status_str = ', '.join(msg.active_states)
local_data_str = pickle.loads(msg.local_data)
info_str = msg.info
print("status -> {}".format(status_str))
print("description_str -> {}".format(local_data_str['DESCRIPTION']))
#print("image_str -> {}".format(local_data_str['IMAGE']))
if local_data_str['IMAGE'] :
msg = CompressedImage()
msg.deserialize(base64.b64decode(local_data_str['IMAGE']))
np_arr = np.fromstring(msg.data, np.uint8)
self.cv_image = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
print("info_str -> {}".format(info_str))
def spin(self):
rate = rospy.Rate(10)
while not rospy.is_shutdown():
if not self.cv_image is None:
cv2.imshow("Image", self.cv_image)
cv2.waitKey(2)
rate.sleep()
stm = SmachToMail()
stm.spin()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment