如果要在CITA中添加新的微服务(微服务用C++编写),如何将新的微服务继承上去?
CITA添加新的微服务
cita的微服务之间是用MQ通信的,broker的地址在每个节点目录中的.env文件中。例如:
AMQP_URL=amqp://guest:guest@localhost/test-chain/0
通信方式是用的pub/sub模式。
在已有的微服务的main函数头上的注释里有描述每个微服务的发布和订阅的消息类型。
新增加微服务,只要连接到同样的broker,然后订阅自己感兴趣的消息即可。
消息对应的数据类型是用protobuf定义的,在 https://github.com/citahub/cita-proto
您好,我使用RabbitMQ CPP客户端(https://github.com/CopernicaMarketingSoftware/AMQP-CPP) 试验了一下,一直无法连接上MQ(节点0的broker地址是amqp://guest:guest@localhost/thb-chain/0),exchange名称用的是Consensus,Queue名称用chain,routing_key使用“Consensus >> BlockWithProof“,用于接收来自共识模块的消息。
代码如下:
#include <amqpcpp.h>
#include <amqpcpp/linux_tcp.h>
#include <unistd.h>
#include <iostream>
using namespace std;
class MyTcpHandler : public AMQP::TcpHandler
{
/**
* Method that is called by the AMQP library when a new connection
* is associated with the handler. This is the first call to your handler
* @param connection The connection that is attached to the handler
*/
virtual void onAttached(AMQP::TcpConnection *connection) override
{
// @todo
// add your own implementation, for example initialize things
// to handle the connection.
}
/**
* Method that is called by the AMQP library when the TCP connection
* has been established. After this method has been called, the library
* still has take care of setting up the optional TLS layer and of
* setting up the AMQP connection on top of the TCP layer., This method
* is always paired with a later call to onLost().
* @param connection The connection that can now be used
*/
virtual void onConnected(AMQP::TcpConnection *connection) override
{
// @todo
// add your own implementation (probably not needed)
cout<<"Connect success"<<endl;
}
/**
* Method that is called when the secure TLS connection has been established.
* This is only called for amqps:// connections. It allows you to inspect
* whether the connection is secure enough for your liking (you can
* for example check the server certicate). The AMQP protocol still has
* to be started.
* @param connection The connection that has been secured
* @param ssl SSL structure from openssl library
* @return bool True if connection can be used
*/
virtual bool onSecured(AMQP::TcpConnection *connection, const SSL *ssl) override
{
// @todo
// add your own implementation, for example by reading out the
// certificate and check if it is indeed yours
return true;
}
/**
* Method that is called by the AMQP library when the login attempt
* succeeded. After this the connection is ready to use.
* @param connection The connection that can now be used
*/
virtual void onReady(AMQP::TcpConnection *connection) override
{
// @todo
// add your own implementation, for example by creating a channel
// instance, and start publishing or consuming
}
/**
* Method that is called by the AMQP library when a fatal error occurs
* on the connection, for example because data received from RabbitMQ
* could not be recognized, or the underlying connection is lost. This
* call is normally followed by a call to onLost() (if the error occured
* after the TCP connection was established) and onDetached().
* @param connection The connection on which the error occured
* @param message A human readable error message
*/
virtual void onError(AMQP::TcpConnection *connection, const char *message) override
{
// @todo
// add your own implementation, for example by reporting the error
// to the user of your program and logging the error
}
/**
* Method that is called when the AMQP protocol is ended. This is the
* counter-part of a call to connection.close() to graceful shutdown
* the connection. Note that the TCP connection is at this time still
* active, and you will also receive calls to onLost() and onDetached()
* @param connection The connection over which the AMQP protocol ended
*/
virtual void onClosed(AMQP::TcpConnection *connection) override
{
// @todo
// add your own implementation (probably not necessary, but it could
// be useful if you want to do some something immediately after the
// amqp connection is over, but do not want to wait for the tcp
// connection to shut down
}
/**
* Method that is called when the TCP connection was closed or lost.
* This method is always called if there was also a call to onConnected()
* @param connection The connection that was closed and that is now unusable
*/
virtual void onLost(AMQP::TcpConnection *connection) override
{
// @todo
// add your own implementation (probably not necessary)
}
/**
* Final method that is called. This signals that no further calls to your
* handler will be made about the connection.
* @param connection The connection that can be destructed
*/
virtual void onDetached(AMQP::TcpConnection *connection) override
{
// @todo
// add your own implementation, like cleanup resources or exit the application
}
/**
* Method that is called by the AMQP-CPP library when it wants to interact
* with the main event loop. The AMQP-CPP library is completely non-blocking,
* and only make "write()" or "read()" system calls when it knows in advance
* that these calls will not block. To register a filedescriptor in the
* event loop, it calls this "monitor()" method with a filedescriptor and
* flags telling whether the filedescriptor should be checked for readability
* or writability.
*
* @param connection The connection that wants to interact with the event loop
* @param fd The filedescriptor that should be checked
* @param flags Bitwise or of AMQP::readable and/or AMQP::writable
*/
virtual void monitor(AMQP::TcpConnection *connection, int fd, int flags) override
{
// @todo
// add your own implementation, for example by adding the file
// descriptor to the main application event loop (like the select() or
// poll() loop). When the event loop reports that the descriptor becomes
// readable and/or writable, it is up to you to inform the AMQP-CPP
// library that the filedescriptor is active by calling the
// connection->process(fd, flags) method.
}
};
int main(){
// create an instance of your own tcp handler
MyTcpHandler myHandler;
// address of the server
AMQP::Address address("amqp://guest:guest@localhost/thb-chain/0");
// create a AMQP connection object
AMQP::TcpConnection connection(&myHandler, address);
// and create a channel
AMQP::TcpChannel channel(&connection);
// use the channel object to call the AMQP method you like
channel.declareExchange("Chain", AMQP::topic);
channel.declareQueue("chain");
channel.bindQueue("Chain", "chain", "Consensus >> BlockWithProof");
// callback function that is called when the consume operation starts
auto startCb = [](const std::string &consumertag)
{
std::cout << "consume operation started" << std::endl;
};
// callback function that is called when the consume operation failed
auto errorCb = [](const char *message)
{
std::cout << "consume operation failed" << std::endl;
};
// callback operation when a message was received
auto messageCb = [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
std::cout << "message received" << std::endl;
// acknowledge the message
channel.ack(deliveryTag);
};
while(1){
usleep(100000);
cout<<"In the loop..."<<endl;
// start consuming from the queue, and install the callbacks
channel.consume("chain")
.onReceived(messageCb)
.onSuccess(startCb)
.onError(errorCb);
}
return 0;
}
exchange的名字是 cita,类型是topic,queue的名字是自己定义的。
cita里相关代码参见:
非常感谢您的回复,我实验了一下从test-chain/0中获取block_with_proof数据有一些疑问想请教一下
我的rabbitmq客户端用的是SimpleAmqpClient(链接:https://github.com/alanxz/SimpleAmqpClient)
反序列化用的protobuf(链接:https://github.com/protobuf-c/protobuf-c)
proto文件用的cita中的blockchain.proto
困惑的地方有:
- BasicConsumeMessage消费消息后得到的buffer打印出来有内容,但是反序列化失败,只得到一个空指针,在我本地测试创建一个block_with_proof类型数据进行序列化/反序列化是成功的。
- 运行bin/cita start test-chain/0后,通过rabbitmq浏览器查看消息的message body bytes的大小都是0B,是否有问题?所有微服务检查过都已经启动。
代码如下:
#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include <iostream>
#include <string>
#include "blockchain.pb.h"
using namespace std;
int main() {
std::string queue_name = "hello";
AmqpClient::Channel::ptr_t channel = AmqpClient::Channel::Create("localhost",5672,"guest","guest","test-chain/0");
channel->DeclareQueue(queue_name, false, true, false, false);
channel->BindQueue(queue_name,"cita","consensus.block_with_proof");
std::string consumer_tag = channel->BasicConsume(queue_name, "");
//第二个参数为消费者名称,返回值也是消费者名称。
while (1) {
std::cout << "[y] wait for the message" << std::endl;
AmqpClient::Envelope::ptr_t envelope = channel->BasicConsumeMessage(consumer_tag);
std::string buffer = envelope->Message()->Body();
//消息放在信封里,需要解析
cout<<buffer<<endl;
//下面开始protobuf解码
BlockWithProof deserializedBwf;
//解析序列化后的消息对象,即反序列化
if(!deserializedBwf.ParseFromString(buffer)){
std::cout << "Failed to parse BlockWithProof." << endl;
return -1;
}
// cout<<"反序列化成功!"<<endl;
std::cout << "[y] receve " << buffer << std::endl;
}
channel->BasicCancel(consumer_tag);
//关闭消费者。
}
FYI,我编辑了下内容,做了代码格式化,有代码内容时可以使用一对 ``` 符号包裹代码,方便交流
谢谢,请问像上面这样拿到数据后直接用protobuf发序列化正确吗?现在一直是反序列化失败,换用CITA Java SDK还是失败 cita-sdk-java反序列化(出了第一行,下面都是从blockchain.java文件中拷贝出来的)。是不是遗漏了哪一点?