Last active
December 28, 2015 20:38
-
-
Save NikunjGithub/7558507 to your computer and use it in GitHub Desktop.
LibRabbitMQ consumer example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <string> | |
#include <sstream> | |
#include <boost/shared_ptr.hpp> | |
#include <boost/make_shared.hpp> | |
#include <boost/thread.hpp> | |
#include <amqp.h> | |
#include <amqp_framing.h> | |
#include "rabbitConfig.h" | |
#include "Exception.hpp" | |
//count to control number of messages cached by consumers | |
//if no caching is required then set to 1 | |
//if unlimited caching is required then set to 0 | |
//Count depends on the design of application, how producers and | |
//consumers are designed and what is the throughput desired. | |
int PREFETCH_COUNT = 1; | |
//count of conumsers | |
int CONSUMER_COUNT = 5; | |
std::string HOST = "<ipaddress>"; | |
int PORT = 5672; | |
std::string VHOST = "/"; | |
std::string USER = "guest"; | |
std::string PASSWORD = "guest"; | |
std::string QUEUENAME = "queue"; | |
std::string EXCHANGE = "NewExchange"; | |
std::string ROUTINGKEY = "routingKey"; | |
static const int CHANNEL = 1; //Defines AMQP Channel Count | |
static const int MANDATORY = 0; //Defines Queue Parameter | |
static const int IMMEDIATE = 0; | |
static const int CHANNEL_MAX = 0; //Defines Max. no. of Channels | |
static const int FRAME_MAX = 131072; // Defines max frame size | |
static const int HEART_BEAT = 0; // Defines heart beat time | |
static const int NO_LOCAL = 0; | |
static const int NO_ACK = 0; | |
static const int EXCLUSIVE = 0; | |
//Data class | |
class Data{ | |
public: | |
int _id; | |
// your class implementation | |
}; | |
//connect to queue and start consuming messages | |
void connectAndConsume() | |
{ | |
//new connection | |
amqp_connection_state_t | |
_conn = amqp_new_connection(); | |
int _sockfd = amqp_open_socket(HOST.c_str(),PORT); | |
if (_sockfd < 0) | |
{ | |
std::stringstream ss; | |
char* errstr = amqp_error_string(- _sockfd); | |
ss << "Cannot create socket: " << errstr; | |
free(errstr); | |
std::cout<<ss.str()<< __FILE__<< __LINE__<<std::endl; | |
} | |
amqp_set_sockfd(_conn,_sockfd); | |
//login | |
amqp_rpc_reply_t response = amqp_login(_conn | |
,VHOST.c_str() | |
,CHANNEL_MAX | |
,FRAME_MAX | |
,HEART_BEAT | |
,AMQP_SASL_METHOD_PLAIN | |
,USER.c_str() | |
,PASSWORD.c_str()); | |
if (response.reply_type != AMQP_RESPONSE_NORMAL) | |
{ | |
std::cout<<"Error while login."<</*response<<*/__FILE__<<":"<<__LINE__<<std::endl; | |
exit(1); | |
} | |
amqp_channel_open(_conn,1); | |
amqp_basic_qos_ok_t* qos = amqp_basic_qos(_conn, CHANNEL, 0, PREFETCH_COUNT ,false ); | |
//Declare queue | |
amqp_queue_declare_ok_t *r | |
= amqp_queue_declare(_conn, | |
1, | |
amqp_cstring_bytes(QUEUENAME.c_str()), | |
0, | |
1, | |
0, | |
0, | |
amqp_empty_table); | |
amqp_rpc_reply_t ret = amqp_get_rpc_reply(_conn); | |
if (ret.reply_type != AMQP_RESPONSE_NORMAL) | |
{ | |
std::cout<<"Error declaring queue."<</*ret<<*/__FILE__<<":"<<__LINE__<<std::endl; | |
} | |
if (r->queue.bytes == NULL) { | |
std::cout<<"Out of memory while copying queue name."<<__FILE__<<__LINE__<<std::endl; | |
} | |
//bind Queue for executing amqp commands | |
amqp_queue_bind(_conn, | |
1, | |
amqp_cstring_bytes(QUEUENAME.c_str()), | |
amqp_cstring_bytes(EXCHANGE.c_str()), | |
amqp_cstring_bytes(ROUTINGKEY.c_str()), | |
amqp_empty_table); | |
//register for consume | |
amqp_basic_consume(_conn, | |
CHANNEL, | |
amqp_cstring_bytes(QUEUENAME.c_str()), | |
amqp_empty_bytes, | |
NO_LOCAL, | |
NO_ACK, | |
EXCLUSIVE, | |
amqp_empty_table); | |
amqp_rpc_reply_t ret1 = amqp_get_rpc_reply(_conn); | |
if (ret1.reply_type != AMQP_RESPONSE_NORMAL) | |
{ | |
std::cout<<"Unable to send consume command"<</*ret1<<*/ __FILE__<< __LINE__<<std::endl; | |
} | |
int result; | |
//Consume Messages | |
while(true) | |
{ | |
amqp_frame_t frame; | |
amqp_maybe_release_buffers(_conn); | |
result = amqp_simple_wait_frame(_conn, &frame); | |
if (result < 0) | |
{ | |
std::cout<<"Error in header frame"<<__FILE__<<__LINE__<<std::endl; | |
} | |
if (frame.frame_type != AMQP_FRAME_METHOD) | |
{ | |
continue; | |
} | |
amqp_basic_deliver_t *d | |
= (amqp_basic_deliver_t *) frame.payload.method.decoded; | |
if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) | |
{ | |
continue; | |
} | |
result = amqp_simple_wait_frame(_conn, &frame); | |
if (result < 0) | |
{ | |
std::cout<<"Message frame is invalid!"<< __FILE__<< __LINE__<<std::endl; | |
} | |
if (frame.frame_type != AMQP_FRAME_HEADER) | |
{ | |
std::cout<<"Expected header!"<<__FILE__<< __LINE__<<std::endl; | |
} | |
amqp_basic_properties_t *p | |
= (amqp_basic_properties_t *) frame.payload.properties.decoded; | |
size_t body_size = frame.payload.properties.body_size; | |
size_t body_received = 0; | |
amqp_bytes_t body = amqp_bytes_malloc(body_size); | |
while (body_received < body_size) | |
{ | |
result = amqp_simple_wait_frame(_conn, &frame); | |
if (result < 0) { | |
std::cerr<< "Wait Frame less than Zero.."<< std::endl; | |
} | |
if (frame.frame_type != AMQP_FRAME_BODY) { | |
std::cout<<"Expected body frame!"<< __FILE__<<__LINE__<<std::endl; | |
} | |
void* body_ptr = reinterpret_cast<char*>(body.bytes) + body_received; | |
memcpy(body_ptr, frame.payload.body_fragment.bytes | |
, frame.payload.body_fragment.len); | |
body_received += frame.payload.body_fragment.len; | |
}//while ends | |
if(body_received != body_size) | |
{ | |
std::cerr << "Received Body is less than Body Size.." << std::endl; | |
} | |
amqp_basic_ack(_conn, 1,d->delivery_tag,0); | |
boost::shared_ptr<Data> _data = boost::make_shared<Data>(); | |
memcpy( _data.get(), static_cast<char*>(body.bytes), body.len); | |
amqp_bytes_free(body); | |
std::cout<<"Data id : "<<_data->_id<<std::endl; | |
} | |
} | |
int main() | |
{ | |
std::vector<boost::thread*> consumerList; | |
for(int x = 0 ; x < CONSUMER_COUNT; ++x) | |
{ | |
consumerList.push_back(new boost::thread( | |
connectAndConsume)); | |
} | |
for(std::vector<boost::thread*>::iterator itr = consumerList.begin(); | |
itr != consumerList.end(); ++itr) | |
{ | |
(*itr)->join(); | |
} | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment