Pull to refresh

RabbitMQ tutorials на C++

Reading time8 min
Views33K
На сайте rabbitmq.com в разделе tutorials приведены примеры реализации на различных языках, но среди них нет C++. Под катом собраны ссылки на переведенные руководства, материалы и код под спойлером.

Кому удобнее просматривать код из под интерфейса GitHub, можно сразу перейти в репозиторий.

Данный материал использует реализацию клиента AMQP-CPP и POCO C++ для работы с сокетом.

«RabbitMQ tutorial 1 — Hello World»

receive.cpp
#include <iostream>
#include "SimplePocoHandler.h"

int main(void)
{
    SimplePocoHandler handler("localhost", 5672);

    AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");

    AMQP::Channel channel(&connection);
    channel.declareQueue("hello");
    channel.consume("hello", AMQP::noack).onReceived(
            [](const AMQP::Message &message,
                       uint64_t deliveryTag,
                       bool redelivered)
            {

                std::cout <<" [x] Received "<<message.message() << std::endl;
            });

    std::cout << " [*] Waiting for messages. To exit press CTRL-C\n";
    handler.loop();
    return 0;
}


send.cpp
#include <iostream>

#include "SimplePocoHandler.h"

int main(void)
{
    SimplePocoHandler handler("localhost", 5672);

    AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
    AMQP::Channel channel(&connection);

    channel.onReady([&]()
    {
        if(handler.connected())
        {
            channel.publish("", "hello", "Hello World!");
            std::cout << " [x] Sent 'Hello World!'" << std::endl;
            handler.quit();
        }
    });

    handler.loop();
    return 0;
}


«RabbitMQ tutorial 2 — Очередь задач»

worker.cpp
#include <iostream>
#include <algorithm>
#include <thread>
#include <chrono>

#include "SimplePocoHandler.h"

int main(void)
{
    SimplePocoHandler handler("localhost", 5672);

    AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");

    AMQP::Channel channel(&connection);
    channel.setQos(1);

    channel.declareQueue("task_queue", AMQP::durable);
    channel.consume("task_queue").onReceived(
            [&channel](const AMQP::Message &message,
                       uint64_t deliveryTag,
                       bool redelivered)
            {
                const auto body = message.message();
                std::cout<<" [x] Received "<<body<<std::endl;

                size_t count = std::count(body.cbegin(), body.cend(), '.');
                std::this_thread::sleep_for (std::chrono::seconds(count));

                std::cout<<" [x] Done"<<std::endl;
                channel.ack(deliveryTag);
            });


    std::cout << " [*] Waiting for messages. To exit press CTRL-C\n";
    handler.loop();
    return 0;
}


new_task.cpp
#include <iostream>

#include "SimplePocoHandler.h"
#include "tools.h"

int main(int argc, const char* argv[])
{
    const std::string msg =
            argc > 1 ? join(&argv[1], &argv[argc], " ") : "Hello World!";

    SimplePocoHandler handler("localhost", 5672);

    AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");
    AMQP::Channel channel(&connection);

    AMQP::QueueCallback callback =
            [&](const std::string &name, int msgcount, int consumercount)
            {
                AMQP::Envelope env(msg);
                env.setDeliveryMode(2);
                channel.publish("", "task_queue", env);
                std::cout<<" [x] Sent '"<<msg<<"'\n";
                handler.quit();
            };

    channel.declareQueue("task_queue", AMQP::durable).onSuccess(callback);
    handler.loop();
    return 0;
}


«RabbitMQ tutorial 3 — Публикация/Подписка»

receive_logs.cpp
#include <iostream>

#include "SimplePocoHandler.h"

int main(void)
{
    SimplePocoHandler handler("localhost", 5672);

    AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");

    AMQP::Channel channel(&connection);
    auto receiveMessageCallback = [](const AMQP::Message &message,
            uint64_t deliveryTag,
            bool redelivered)
    {

        std::cout <<" [x] "<<message.message() << std::endl;
    };

    AMQP::QueueCallback callback =
            [&](const std::string &name, int msgcount, int consumercount)
            {
                channel.bindQueue("logs", name,"");
                channel.consume(name, AMQP::noack).onReceived(receiveMessageCallback);
            };

    AMQP::SuccessCallback success = [&]()
            {
                channel.declareQueue(AMQP::exclusive).onSuccess(callback);
            };

    channel.declareExchange("logs", AMQP::fanout).onSuccess(success);

    std::cout << " [*] Waiting for messages. To exit press CTRL-C\n";
    handler.loop();
    return 0;
}


emit_log.cpp
#include <iostream>

#include "SimplePocoHandler.h"
#include "tools.h"

int main(int argc, const char* argv[])
{
    const std::string msg =
            argc > 1 ? join(&argv[1], &argv[argc], " ") : "info: Hello World!";

    SimplePocoHandler handler("localhost", 5672);

    AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");

    AMQP::Channel channel(&connection);
    channel.declareExchange("logs", AMQP::fanout).onSuccess([&]()
    {
        channel.publish("logs", "", msg);
        std::cout << " [x] Sent "<<msg<< std::endl;
        handler.quit();
    });

    handler.loop();
    return 0;
}


«RabbitMQ tutorial 4 — Роутинг»

receive_logs_direct.cpp
#include <iostream>
#include <algorithm>

#include "SimplePocoHandler.h"

int main(int argc, const char* argv[])
{
    if(argc==1)
    {
        std::cout<<"Usage: "<<argv[0]<<" [info] [warning] [error]"<<std::endl;
        return 1;
    }
    SimplePocoHandler handler("localhost", 5672);

    AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");

    AMQP::Channel channel(&connection);

    channel.declareExchange("direct_logs", AMQP::direct);

    auto receiveMessageCallback =
            [](const AMQP::Message &message,
               uint64_t deliveryTag,
               bool redelivered)
            {
                std::cout <<" [x] "
                          <<message.routingKey()
                          <<":"
                          <<message.message()
                          << std::endl;
            };

    AMQP::QueueCallback callback = [&](const std::string &name,
            int msgcount,
            int consumercount)
    {
        std::for_each(&argv[1],
                &argv[argc],
                [&](const char* severity)
                {
                    channel.bindQueue("direct_logs","", severity);
                    channel.consume(name, AMQP::noack).onReceived(receiveMessageCallback);
                });

    };
    channel.declareQueue(AMQP::exclusive).onSuccess(callback);

    std::cout << " [*] Waiting for messages. To exit press CTRL-C\n";
    handler.loop();
    return 0;
}


emit_log_direct.cpp
#include <iostream>

#include "SimplePocoHandler.h"
#include "tools.h"

int main(int argc, const char* argv[])
{
    const std::string severity = argc > 2 ? argv[1] : "info";
    const std::string msg =
            argc > 2 ? join(&argv[2], &argv[argc], " ") : "Hello World!";

    SimplePocoHandler handler("localhost", 5672);

    AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");

    AMQP::Channel channel(&connection);
    channel.declareExchange("direct_logs", AMQP::direct).onSuccess([&]()
    {
        channel.publish("direct_logs", severity, msg);
        std::cout << " [x] Sent "<<severity<<":"<<msg<< std::endl;
        handler.quit();
    });

    handler.loop();
    return 0;
}


«RabbitMQ tutorial 5 — Тематики»

receive_logs_topic.cpp
#include <iostream>
#include <algorithm>

#include "SimplePocoHandler.h"

int main(int argc, const char* argv[])
{
    if(argc==1)
    {
        std::cout<<"Usage: "<<argv[0]<<" [binding_key]..."<<std::endl;
        return 1;
    }
    SimplePocoHandler handler("localhost", 5672);

    AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");

    AMQP::Channel channel(&connection);

    channel.declareExchange("topic_logs", AMQP::topic);

    auto receiveMessageCallback =
            [](const AMQP::Message &message,
               uint64_t deliveryTag,
               bool redelivered)
            {
                std::cout <<" [x] "
                          <<message.routingKey()
                          <<":"
                          <<message.message()
                          << std::endl;
            };

    AMQP::QueueCallback callback = [&](const std::string &name,
            int msgcount,
            int consumercount)
    {
        std::for_each(&argv[1],
                &argv[argc],
                [&](const char* bindingKeys)
                {
                    std::cout<<bindingKeys<<std::endl;
                    channel.bindQueue("topic_logs",name, bindingKeys);
                    channel.consume(name, AMQP::noack).onReceived(receiveMessageCallback);
                });

    };
    channel.declareQueue(AMQP::exclusive).onSuccess(callback);

    std::cout << " [*] Waiting for messages. To exit press CTRL-C\n";
    handler.loop();
    return 0;
}


emit_log_topic.cpp
#include <iostream>

#include "SimplePocoHandler.h"
#include "tools.h"

int main(int argc, const char* argv[])
{
    const std::string msg =
            argc > 1 ? join(&argv[2], &argv[argc], " ") : "Hello World!";
    const std::string routing_key = argc > 1 ? argv[1] : "anonymous.info";

    SimplePocoHandler handler("localhost", 5672);

    AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");

    AMQP::Channel channel(&connection);
    channel.declareExchange("topic_logs", AMQP::topic).onSuccess([&]()
    {
        channel.publish("topic_logs", routing_key, msg);
        std::cout << " [x] Sent "<<routing_key<<":"<<msg<< std::endl;
        handler.quit();
    });

    handler.loop();
    return 0;
}


«RabbitMQ tutorial 6 — Удаленный вызов процедур»

rpc_server.cpp
#include <iostream>
#include <algorithm>
#include <thread>
#include <chrono>

#include "SimplePocoHandler.h"

int fib(int n)
{
    switch (n)
    {
    case 0:
        return 0;
    case 1:
        return 1;
    default:
        return fib(n - 1) + fib(n - 2);
    }
}

int main(void)
{
    SimplePocoHandler handler("localhost", 5672);

    AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");

    AMQP::Channel channel(&connection);
    channel.setQos(1);

    channel.declareQueue("rpc_queue");
    channel.consume("").onReceived([&channel](const AMQP::Message &message,
            uint64_t deliveryTag,
            bool redelivered)
    {
        const auto body = message.message();
        std::cout<<" [.] fib("<<body<<")"<<std::endl;

        AMQP::Envelope env(std::to_string(fib(std::stoi(body))));
        env.setCorrelationID(message.correlationID());

        channel.publish("", message.replyTo(), env);
        channel.ack(deliveryTag);
    });

    std::cout << " [x] Awaiting RPC requests" << std::endl;
    handler.loop();
    return 0;
}


rpc_client.cpp
#include <iostream>

#include "tools.h"
#include "SimplePocoHandler.h"

int main(int argc, const char* argv[])
{
    const std::string correlation(uuid());

    SimplePocoHandler handler("localhost", 5672);

    AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");

    AMQP::Channel channel(&connection);
    AMQP::QueueCallback callback = [&](const std::string &name,
            int msgcount,
            int consumercount)
    {
        AMQP::Envelope env("30");
        env.setCorrelationID(correlation);
        env.setReplyTo(name);
        channel.publish("","rpc_queue",env);
        std::cout<<" [x] Requesting fib(30)"<<std::endl;

    };
    channel.declareQueue(AMQP::exclusive).onSuccess(callback);

    auto receiveCallback = [&](const AMQP::Message &message,
            uint64_t deliveryTag,
            bool redelivered)
    {
        if(message.correlationID() != correlation)
            return;

        std::cout<<" [.] Got "<<message.message()<<std::endl;
        handler.quit();
    };

    channel.consume("", AMQP::noack).onReceived(receiveCallback);

    handler.loop();
    return 0;
}


UPD: 2015.04.09 fix: установка prefetch count происходит корректно; работа tutorial 2; код собирается под g++4.7
Tags:
Hubs:
If this publication inspired you and you want to support the author, do not hesitate to click on the button
+12
Comments1

Articles