基本思想:利用c++实现RabbitMQ简单的生产者和消费者
CMakeList.txt
cmake_minimum_required(VERSION 3.16)
project(producer)
set(CMAKE_CXX_STANDARD 14)
add_executable(producer main.cpp)
target_link_libraries(producer rabbitmq)
** producer.cpp**
#include <iostream>
#include <string>
#include <unistd.h>
#include <amqp.h>
#include <cstring>
#include <amqp_framing.h>
#include <amqp_tcp_socket.h>
using namespace std;
int main() {
string hostName = "127.0.0.1"; // ip 默认值
int port = 5672; // 端口号 默认值
amqp_socket_t *socket = nullptr;
amqp_connection_state_t conn;
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if (!socket) {
cout << "create socket failed!";
exit(1);
}
if (amqp_socket_open(socket, hostName.c_str(), port)) {
cout << "opening TCP socket failed" << endl;
exit(1);
}
// 登录 ("/" 虚拟机默认值)
if(1 != amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest").reply_type) {
cout << "login failed" << endl;
}
// 创建管道(链接)
amqp_channel_open(conn, 1);
// 创建队列
amqp_bytes_t queue;
char mac[20] = "queue_test";
queue.bytes = mac;
queue.len = strlen(mac);
//声明队列
amqp_queue_declare_ok_t *declare = amqp_queue_declare(conn, 1, queue, 0, 0, 0, 1, amqp_empty_table);
int i = 0;
while (true) {
string str = "Hello World " + to_string(i);
char message[64] = {'\0'};
strcpy(message, str.c_str());
amqp_bytes_t message_bytes;
message_bytes.len = sizeof(message);
message_bytes.bytes = message;
// 发送消息
amqp_basic_publish(conn, 1, amqp_cstring_bytes(""), amqp_cstring_bytes("queue_test"), 0, 0, nullptr, message_bytes);
cout << "[已发送] " << str << endl;
i++;
sleep(1);
if(i == 5)
break;
}
cout << "send msg over! " << endl;
// 释放资源
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
// getchar();
return 0;
}
队列中存储5条生产者消息
** consumer.cpp**
#include <iostream>
#include <string>
#include <amqp.h>
#include <ctime>
#include <unistd.h>
#include <amqp_framing.h>
#include <amqp_tcp_socket.h>
using namespace std;
void delay_msec(int msec)
{
clock_t now = clock();
while(clock() - now < msec);
}
int main() {
string name = "received";
string delayStr = "1";
//cout << "The delayStr is : " << delayStr << endl;
int delay = std::stoi(delayStr);
//cout << "The delay is : " << delay << endl;
//string name = "one";
//int delay = 1;
string hostName = "127.0.0.1";
int port = 5672;
amqp_socket_t *socket = nullptr;
amqp_connection_state_t conn;
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if(!socket){
cout << "create socket failed!";
exit(1);
}
if(amqp_socket_open(socket, hostName.c_str(), port)) {
cout << "opening TCP socket failed" << endl;
exit(1);
}
//登录
if(1 != amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest").reply_type) {
cout << "login failed" << endl;
exit(1);
}
amqp_channel_open(conn, 1);
while (true) {
amqp_basic_get(conn, 1, amqp_cstring_bytes("queue_test"), 1);
amqp_message_t *msg = new amqp_message_t;
amqp_read_message(conn, 1, msg, 0);
cout << "[" << name << "] The result is : " << (char *)msg->body.bytes << endl;
amqp_destroy_message(msg);
delete msg;
sleep(1);
// delay_msec(1000 * delay);
}
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
// getchar();
return 0;
}
本文转载自: https://blog.csdn.net/zhangbing0116/article/details/128949704
版权归原作者 MFT小白 所有, 如有侵权,请联系我们删除。
版权归原作者 MFT小白 所有, 如有侵权,请联系我们删除。