您好,我使用RabbitMQ CPP客户端( 试验了一下,一直无法连接上MQ(节点0的broker地址是amqp://guest:guest@localhost/thb-chain/0),exchange名称用的是Consensus,Queue名称用chain,routing_key使用“Consensus >> BlockWithProof“,用于接收来自共识模块的消息。


#include <amqpcpp.h>
#include <amqpcpp/linux_tcp.h>
#include <unistd.h>
#include <iostream>
using namespace std;

class MyTcpHandler : public AMQP::TcpHandler
     *  Method that is called by the AMQP library when a new connection
     *  is associated with the handler. This is the first call to your handler
     *  @param  connection      The connection that is attached to the handler
    virtual void onAttached(AMQP::TcpConnection *connection) override
        // @todo
        //  add your own implementation, for example initialize things
        //  to handle the connection.

     *  Method that is called by the AMQP library when the TCP connection 
     *  has been established. After this method has been called, the library
     *  still has take care of setting up the optional TLS layer and of
     *  setting up the AMQP connection on top of the TCP layer., This method 
     *  is always paired with a later call to onLost().
     *  @param  connection      The connection that can now be used
    virtual void onConnected(AMQP::TcpConnection *connection) override
        // @todo
        //  add your own implementation (probably not needed)
        cout<<"Connect success"<<endl;

     *  Method that is called when the secure TLS connection has been established. 
     *  This is only called for amqps:// connections. It allows you to inspect
     *  whether the connection is secure enough for your liking (you can
     *  for example check the server certicate). The AMQP protocol still has
     *  to be started.
     *  @param  connection      The connection that has been secured
     *  @param  ssl             SSL structure from openssl library
     *  @return bool            True if connection can be used
    virtual bool onSecured(AMQP::TcpConnection *connection, const SSL *ssl) override
        // @todo
        //  add your own implementation, for example by reading out the
        //  certificate and check if it is indeed yours
        return true;

     *  Method that is called by the AMQP library when the login attempt
     *  succeeded. After this the connection is ready to use.
     *  @param  connection      The connection that can now be used
    virtual void onReady(AMQP::TcpConnection *connection) override
        // @todo
        //  add your own implementation, for example by creating a channel
        //  instance, and start publishing or consuming

     *  Method that is called by the AMQP library when a fatal error occurs
     *  on the connection, for example because data received from RabbitMQ
     *  could not be recognized, or the underlying connection is lost. This
     *  call is normally followed by a call to onLost() (if the error occured
     *  after the TCP connection was established) and onDetached().
     *  @param  connection      The connection on which the error occured
     *  @param  message         A human readable error message
    virtual void onError(AMQP::TcpConnection *connection, const char *message) override
        // @todo
        //  add your own implementation, for example by reporting the error
        //  to the user of your program and logging the error

     *  Method that is called when the AMQP protocol is ended. This is the
     *  counter-part of a call to connection.close() to graceful shutdown
     *  the connection. Note that the TCP connection is at this time still 
     *  active, and you will also receive calls to onLost() and onDetached()
     *  @param  connection      The connection over which the AMQP protocol ended
    virtual void onClosed(AMQP::TcpConnection *connection) override 
        // @todo
        //  add your own implementation (probably not necessary, but it could
        //  be useful if you want to do some something immediately after the
        //  amqp connection is over, but do not want to wait for the tcp 
        //  connection to shut down

     *  Method that is called when the TCP connection was closed or lost.
     *  This method is always called if there was also a call to onConnected()
     *  @param  connection      The connection that was closed and that is now unusable
    virtual void onLost(AMQP::TcpConnection *connection) override 
        // @todo
        //  add your own implementation (probably not necessary)

     *  Final method that is called. This signals that no further calls to your
     *  handler will be made about the connection.
     *  @param  connection      The connection that can be destructed
    virtual void onDetached(AMQP::TcpConnection *connection) override 
        // @todo
        //  add your own implementation, like cleanup resources or exit the application

     *  Method that is called by the AMQP-CPP library when it wants to interact
     *  with the main event loop. The AMQP-CPP library is completely non-blocking,
     *  and only make "write()" or "read()" system calls when it knows in advance
     *  that these calls will not block. To register a filedescriptor in the
     *  event loop, it calls this "monitor()" method with a filedescriptor and
     *  flags telling whether the filedescriptor should be checked for readability
     *  or writability.
     *  @param  connection      The connection that wants to interact with the event loop
     *  @param  fd              The filedescriptor that should be checked
     *  @param  flags           Bitwise or of AMQP::readable and/or AMQP::writable
    virtual void monitor(AMQP::TcpConnection *connection, int fd, int flags) override
        // @todo
        //  add your own implementation, for example by adding the file
        //  descriptor to the main application event loop (like the select() or
        //  poll() loop). When the event loop reports that the descriptor becomes
        //  readable and/or writable, it is up to you to inform the AMQP-CPP
        //  library that the filedescriptor is active by calling the
        //  connection->process(fd, flags) method.

int main(){

    // create an instance of your own tcp handler
    MyTcpHandler myHandler;

    // address of the server
    AMQP::Address address("amqp://guest:guest@localhost/thb-chain/0");

    // create a AMQP connection object
    AMQP::TcpConnection connection(&myHandler, address);

    // and create a channel
    AMQP::TcpChannel channel(&connection);

    // use the channel object to call the AMQP method you like
    channel.declareExchange("Chain", AMQP::topic);
    channel.bindQueue("Chain", "chain", "Consensus >> BlockWithProof");

    // callback function that is called when the consume operation starts
    auto startCb = [](const std::string &consumertag)
        std::cout << "consume operation started" << std::endl;

    // callback function that is called when the consume operation failed
    auto errorCb = [](const char *message)
        std::cout << "consume operation failed" << std::endl;

    // callback operation when a message was received
    auto messageCb = [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
    std::cout << "message received" << std::endl;

    // acknowledge the message


        cout<<"In the loop..."<<endl;
        // start consuming from the queue, and install the callbacks
    return 0;

exchange的名字是 cita,类型是topic,queue的名字是自己定义的。




  1. BasicConsumeMessage消费消息后得到的buffer打印出来有内容,但是反序列化失败,只得到一个空指针,在我本地测试创建一个block_with_proof类型数据进行序列化/反序列化是成功的。
  2. 运行bin/cita start test-chain/0后,通过rabbitmq浏览器查看消息的message body bytes的大小都是0B,是否有问题?所有微服务检查过都已经启动。


#include <SimpleAmqpClient/SimpleAmqpClient.h> 
#include <iostream>
#include <string>
#include "blockchain.pb.h"
using namespace std;

int main() {
  std::string queue_name = "hello";
  AmqpClient::Channel::ptr_t channel = AmqpClient::Channel::Create("localhost",5672,"guest","guest","test-chain/0");
  channel->DeclareQueue(queue_name, false, true, false, false);

  std::string consumer_tag = channel->BasicConsume(queue_name, "");

  while (1) {
    std::cout << "[y] wait for the message" << std::endl;
    AmqpClient::Envelope::ptr_t envelope = channel->BasicConsumeMessage(consumer_tag);
    std::string buffer = envelope->Message()->Body();


    BlockWithProof deserializedBwf;

       std::cout << "Failed to parse BlockWithProof." << endl;
       return -1;
    // cout<<"反序列化成功!"<<endl;
    std::cout << "[y] receve " << buffer << std::endl;


FYI,我编辑了下内容,做了代码格式化,有代码内容时可以使用一对 ``` 符号包裹代码,方便交流

谢谢,请问像上面这样拿到数据后直接用protobuf发序列化正确吗?现在一直是反序列化失败,换用CITA Java SDK还是失败 cita-sdk-java反序列化(出了第一行,下面都是从blockchain.java文件中拷贝出来的)。是不是遗漏了哪一点?