Skip to content

Instantly share code, notes, and snippets.

Last active March 21, 2024 16:40
Show Gist options
  • Save jtbandes/6b632b45e4a88ce178d7c9b7b1810b91 to your computer and use it in GitHub Desktop.
Save jtbandes/6b632b45e4a88ce178d7c9b7b1810b91 to your computer and use it in GitHub Desktop.
#include <chrono>
#include <csignal>
#include <mcap/writer.hpp>
#include <mqtt/client.h>
#include <string>
#include <tuple>
#include <unordered_map>
#include <CLI/App.hpp>
#include <CLI/Config.hpp>
#include <CLI/Formatter.hpp>
using namespace std::chrono_literals;
std::atomic_bool signal_quit(false);
void SignalHandler(int signal)
std::cout << "Quitting..." << std::endl;;
// Return the stamp property if set, otherwise use the current time
mcap::Timestamp GetTimestampFromProperties(
mqtt::properties properties, const std::string& stamp_property)
if (!stamp_property.empty())
size_t property_count = properties.count(mqtt::property::USER_PROPERTY);
for (size_t i = 0; i < property_count; ++i)
auto property = properties.get(mqtt::property::USER_PROPERTY, i);
const auto& [property_name, property_value] = mqtt::get<mqtt::string_pair>(property);
if (property_name == stamp_property)
return std::stoull(property_value);
return std::chrono::duration_cast<std::chrono::nanoseconds>(
int main(int argc, char** argv)
// Set up signal handler
signal(SIGINT, SignalHandler);
// Parse command line arguments
CLI::App app;
std::string hostname = "tcp://localhost:1883";
app.add_option("--host", hostname, "Broker url");
std::string output_path = "output.mcap";
app.add_option("-o,--output", output_path, "Output path");
std::vector<std::string> topic_patterns_to_encodings_args;
"--topics", topic_patterns_to_encodings_args, "Topics and message encoding split by :")
std::string stamp_property;
app.add_option("--stamp-property", stamp_property,
"Name of the MQTT user property to use as publish time in nanoseconds (default empty)");
CLI11_PARSE(app, argc, argv);
std::unordered_map<std::string, std::string> topic_patterns_to_encodings;
// Assert topics to encodings
for (const auto& arg : topic_patterns_to_encodings_args)
// Split by colon
auto pos = arg.find(':');
if (pos == std::string::npos)
// No encoding specified in the argument
topic_patterns_to_encodings.emplace(arg, "");
// Encoding specified in the argument
topic_patterns_to_encodings.emplace(arg.substr(0, pos), arg.substr(pos + 1));
// Set up MQTT client
auto create_options = mqtt::create_options_builder().mqtt_version(MQTTVERSION_5).finalize();
auto mqtt_client_ = std::make_shared<mqtt::async_client>(hostname, "mqtt_mcap", create_options);
auto connect_options = mqtt::connect_options_builder()
// In order to not miss any messages, setup client to receive messages before connecting
// Repeatedly trying to connect until successful
while (!signal_quit)
// Start connecting
auto connect_token = mqtt_client_->connect(connect_options);
catch (const mqtt::exception& e)
std::cout << "MQTT connection failed: " << e.what() << std::endl;
if (signal_quit)
return 0;
std::cout << "MQTT connection successful" << std::endl;
// Set up MCAP writer
mcap::McapWriter writer;
mcap::McapWriterOptions opts("mqtt");
auto status =, opts);
if (!status.ok())
std::cerr << "Failed to open mcap writer: " << status.message << "\n";
throw std::runtime_error("could not open mcap writer");
std::unordered_map<std::string, mcap::ChannelId> channel_map;
std::unordered_map<size_t, std::string> subscription_id_to_encoding;
int subscription_id = 0;
// Set up subscribers, with incrementing subscription identifiers
for (const auto& [topic_pattern, encoding] : topic_patterns_to_encodings)
subscription_id_to_encoding.emplace(subscription_id, encoding);
auto properties = mqtt::properties();
properties.add(mqtt::property(mqtt::property::SUBSCRIPTION_IDENTIFIER, subscription_id));
topic_pattern, DEFAULT_TOPIC_QOS, mqtt::subscribe_options(), properties);
while (!signal_quit)
mqtt::const_message_ptr msg_ptr;
if (!mqtt_client_->try_consume_message_for(&msg_ptr, 1s))
std::cout << "Received message: " << msg_ptr->get_topic() << std::endl;
std::string topic = msg_ptr->get_topic();
auto properties = msg_ptr->get_properties();
auto search_it = channel_map.find(topic);
if (channel_map.find(topic) == channel_map.end())
// Use subscription identifier to find the corresponding encoding
std::optional<std::string> encoding;
size_t property_count = properties.count(mqtt::property::SUBSCRIPTION_IDENTIFIER);
for (size_t i = 0; i < property_count; ++i)
auto property = properties.get(mqtt::property::SUBSCRIPTION_IDENTIFIER, i);
int subscription_identifier = mqtt::get<int>(property);
if (subscription_id_to_encoding.count(subscription_identifier) == 0)
throw std::runtime_error(
"received unexpected message with unknown subscription "
"identifier '" +
std::to_string(subscription_identifier) + "'");
std::string found_encoding =;
if (encoding && encoding != found_encoding)
throw std::runtime_error("multiple encodings found for same topic '" + topic +
"': '" + *encoding + "' '" + found_encoding + "'");
encoding = found_encoding;
if (!encoding)
throw std::runtime_error(
"received unexpected message with no encoding found for topic '" + topic + "'");
mcap::Channel channel(topic, *encoding, 0);
search_it = channel_map.emplace(topic,;
mcap::ChannelId channel_id = search_it->second;
const std::string& payload = msg_ptr->get_payload_str();
// Construct a MCAP message
mcap::Message msg;
msg.channelId = channel_id; = reinterpret_cast<const std::byte*>(;
msg.dataSize = payload.size();
msg.logTime = GetTimestampFromProperties(properties, stamp_property);
msg.publishTime = msg.logTime;
auto status = writer.write(msg);
if (!status.ok())
std::cerr << "Failed to write message: " << status.message << "\n";
throw std::runtime_error("could not write message");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment