Last active
September 7, 2017 15:06
-
-
Save alexgtn/6f171403a778f33771f5307e03a3aabf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.vertx.mqtt.test.server; | |
import io.netty.handler.codec.mqtt.MqttQoS; | |
import io.vertx.core.Vertx; | |
import io.vertx.core.logging.Logger; | |
import io.vertx.core.logging.LoggerFactory; | |
import io.vertx.ext.unit.TestContext; | |
import io.vertx.ext.unit.junit.VertxUnitRunner; | |
import io.vertx.mqtt.MqttEndpoint; | |
import io.vertx.mqtt.MqttServer; | |
import io.vertx.mqtt.MqttServerOptions; | |
import io.vertx.mqtt.routing.Route; | |
import io.vertx.mqtt.routing.Router; | |
import org.junit.After; | |
import org.junit.Before; | |
import org.junit.Test; | |
import org.junit.runner.RunWith; | |
@RunWith(VertxUnitRunner.class) | |
public class MqttServerRouterTest { | |
private static final Logger log = LoggerFactory.getLogger(MqttServerRouterTest.class); | |
protected static final String MQTT_SERVER_HOST = "localhost"; | |
protected static final int MQTT_SERVER_PORT = 1883; | |
MqttEndpoint mqttEndpoint; | |
private Vertx vertx; | |
@Before | |
public void before(TestContext context) { | |
this.vertx = Vertx.vertx(); | |
} | |
@After | |
public void after(TestContext context) { | |
this.vertx.close(); | |
} | |
@Test | |
public void messageRouting(TestContext context) { | |
MqttServer mqttServer = MqttServer.create( | |
this.vertx, | |
new MqttServerOptions() | |
.setHost(MQTT_SERVER_HOST) | |
.setPort(MQTT_SERVER_PORT) | |
); | |
Router router = Router.router(vertx); | |
mqttServer.endpointHandler(endpoint -> { | |
endpoint | |
.publishHandler(router::accept) | |
.publishReleaseHandler(endpoint::publishComplete); | |
mqttEndpoint = endpoint; | |
endpoint.accept(true); | |
}).listen(ar -> { | |
if (ar.succeeded()) { | |
log.info("MQTT server listening on port " + ar.result().actualPort()); | |
latchListen.countDown(); | |
} else { | |
log.error("Error starting MQTT server", ar.cause()); | |
} | |
}); | |
Route testRoute = router.route().topic("/foo/+/bar"); | |
testRoute.mqttMessageHandler(message -> { | |
System.out.println("Got a message on topic: " + message.topicName()); | |
if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) { | |
mqttEndpoint.publishAcknowledge(message.messageId()); | |
} else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) { | |
mqttEndpoint.publishRelease(message.messageId()); | |
} | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment