异步定时器
首先来看一个异步的定时器:
void idle(boost::system::error_code const& ec, asio::steady_timer* timer) {
if(ec) {
spdlog::error("运行 idle 循环出错:{}", ec.message());
return;
}
timer->expires_at(timer->expiry() + std::chrono::seconds(1)); (3)
timer->async_wait(boost::bind(idle, asio::placeholders::error, timer));
}
int main(int, char**) {
asio::io_context io; (1)
asio::steady_timer t(io, std::chrono::seconds(1));
t.async_wait(boost::bind(idle, asio::placeholders::error, &t)); (2)
io.run();
return 0;
}
1 | 首先创建一个 io_context 对象。io_context 具备以下特点:
|
2 | 创建一个定时器并进入异步等待。Asio 的定时器都是单次的,只会被运行一次 |
3 | 从 timer 对象中拿到上次过期时间,并根据过期时间计算出下一个需要激活的 时间点 。注意这里使用的 expires_at 代表了一个时间点 |
在上面的异步定时器中,首先创建一个定时器,然后定时器再 io.run()
中被执行。当定时器超时事件发生时,回调函数被执行,然后回调函数再次激活定时器,从而创建一个不断执行的定时器,也保证了 io_context 一直有任务在执行,不会被退出。
异步 TCP Echo 套接字
TCP Echo 套接字是一个简单的服务器例子。UDP 也与其类似
#include <cstdlib>
#include <iostream>
#include <memory>
#include <utility>
#include "asio.hpp"
using asio::ip::tcp;
class session : public std::enable_shared_from_this<session> {
public:
session(tcp::socket socket) : socket_(std::move(socket)) { }
void start() {
do_read();
}
private:
void do_read() {
auto self(shared_from_this());
socket_.async_read_some(asio::buffer(data_, max_length), [this, self](std::error_code ec, std::size_t length) { (2)
if(!ec) {
do_write(length);
}
});
}
void do_write(std::size_t length) {
auto self(shared_from_this());
asio::async_write(socket_, asio::buffer(data_, length),
[this, self](std::error_code ec, std::size_t /*length*/) {
if(!ec) {
do_read();
}
});
}
tcp::socket socket_;
enum { max_length = 1024 };
char data_[max_length];
};
class server {
public:
server(asio::io_context &io_context) : acceptor_(io_context, tcp::endpoint(tcp::v4(), 9000)) { (1)
do_accept();
}
private:
void do_accept() {
acceptor_.async_accept([this](std::error_code ec, tcp::socket socket) {
if(!ec) {
std::make_shared<session>(std::move(socket))->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
};
int main(int argc, char *argv[]) {
asio::io_context io_context;
server s(io_context);
io_context.run();
return 0;
}
和异步定时器类似,套接字也是运行在 io_context 上的。TCP 套接字在 Asio 中被分为监听套接字(tcp::acceptor
)和普通套接字(tcp::socket
)。监听套接字被用来接收连接,普通套接字被用来处理连入的连接。
1 | 此处传入参数来构建一个监听套接字。然后进行异步监听。由于在 async_accept 的回调函数中又调用了 async_accept ,因而保证了监听套接字一直有任务 |
2 | 使用闭包管理套接字的生命周期 |
在上面的例子中,使用构造函数创建了监听套接字,这种方式下监听套接字会自动打开,但是在某些场景下可能需要手动打开或关闭套接字,这时可以使用下面的方法:
boost::system::error_code ec;
tcp::endpoint endpoint(tcp::v4(), 9000);
acceptor_.open(endpoint.protocol(), ec);
handle_err(ec);
acceptor_.bind(endpoint, ec);
handle_err(ec);
acceptor_.listen();
UDP 的也与此类似:
boost::system::error_code ec;
auto address = asio::ip::make_address("127.0.0.1", ec);
handle_err(ec);
udp::endpoint endpoint(address, 9000);
socket_.open(endpoint.protocol(), ec);
handle_err(ec);
socket_.bind(endpoint, ec);
handle_err(ec);
套接字选项
TCP 和 UDP 设置套接字的方式大同小异,其中最常见的一个选项为端口复用,其有两种设置方式:
在构造函数中设置
tcp::acceptor acceptor(io_context, endpoint, true /* 打开端口复用 */);
使用普通的设置方式
acceptor_.open(endpoint.protocol(), ec); handle_err(ec); acceptor_.set_option(tcp::socket::reuse_address(true), ec); handle_err(ec); acceptor_.bind(endpoint, ec); handle_err(ec);
基于协程的异步套接字
如果项目中可以使用 Boost::coroutine
,那么就可以使用基于协程的套接字。使用基于协程的套接字,极大地降低了代码的复杂度:
asio::spawn(io_context_, [&](asio::yield_context yield_ctx) { (1)
while(acceptor_.is_open()) {
boost::system::error_code ec;
auto socket = std::make_shared<tcp::socket>(io_context_);
acceptor_.async_accept(*socket, yield_ctx[ec]); (2)
handle_err(ec);
asio::spawn(io_context_.get_executor(), [this, socket](asio::yield_context yield) { (3)
this->accept_socket(socket, yield_ctx);
}, asio::detached);
}
});
1 | 创建一个新的协程 |
2 | 在 yield_ctx 下执行异步操作 |
3 | 在新协程中操作接收套接字 |
如上所示,通过使用协程,避免了回调函数的相互嵌套,简化了开发流程。对于 asio::spawn
可以简单地将其比作新线程。
异步操作
除了普通的异步方法外,Asio 还提供了其它方法来讲我们自己的方法变成异步操作,具体为以下两个方法:
方法 | 是否阻塞 | 作用 |
---|---|---|
io_context::strand::defer | 否 | 函数对象在此函数结束后才会被调度 |
asio::dispatch | 否 | 提交一个完成令牌或函数对象执行 |
asio::post | 否 | 提交一个完成令牌或函数对象执行 |
读取数据到变量中
asio::buffer
具有第二种重载方式:
mutable_buffer buffer(
const mutable_buffer & b,
std::size_t max_size_in_bytes);
因此,如果需要将变量读取到变量中,方式为:
uint64_t data_len;
socket_.async_receive(asio::buffer(&data_len, sizeof(data_len)), yield_ctx[ec]);
异步文件
Asio 通过类 asio::stream_file
完成了对文件异步读写的支持,使用的方式为:
asio::stream_file file;
file.open(file_name,
asio::stream_file::create |
asio::stream_file::write_only |
asio::stream_file::append, ec
);
if(ec) {
spdlog::error("打开日志文件失败:{}", ec.message());
return;
}
asio::async_write(file, asio::buffer(res, res.size()), yield_ctx[ec]);
多播
使用多播时有一些不同。现在假设多播地址为 address:port,然后对于接收者和发送者的操作分别为:
-- sender
create_udp_socket()
join_multicast_group()
send_to(address:port)
-- recever
create_udp_socket()
listen("0.0.0.0":port)
join_multicast_group()
recv()
当一个套接字加入一个组播后,此套接字就可以收到来自此组播地址的数据。然后使用绑定时的端口对数据进行过滤。因此这里 listen 的唯一作用就是对组播地址进行过滤
由于绑定的地址是“通配地址”,因此只有端口过滤起作用。也就是说,在上面的 demo 中,recever 可用得到所有发往 port 的数据,无论组播地址是什么
更好的办法是直接绑定组播地址,但是 windows 无法绑定组播地址 。
在单机测试中,可用通过绑定发送端的 ip 来解决。但是单机中发送端 ip 和接收端 ip 相同,不知道实际应该如何
自动拆包
Asio 的 read_until 函数可用实现对 TCP 数据流的自动拆包。自动拆包主要用到了下面的重载形式:
async_read_until(
AsyncReadStream & s,
DynamicBuffer_v1 && buffers,
MatchCondition match_condition
)
其中 MatchCondition 有两种形式:函数抑或是类。由于函数其本身限制,下面只讲解类的方式。
要使用类的方式实现 MatchCondiction,有两个要点:
实现
std::pair<iterator, bool> operator()(iterator begin, iterator end)
方法使用
template<> struct boost::asio::is_match_condition<HeaderParser> : public boost::true_type { };
注册类型
下面是一个 MatchCondition 的实现例子:
下面的例子中,协议的规定为:
data_size\n file_name\n file_data
其中 data_size + file_name 的长度不能超过 1024 字节
class HeaderParser {
public:
HeaderParser(std::shared_ptr<tcp::socket> sck, std::string& file_name, uint64_t& data_begin)
: sck_(sck), file_name_(file_name), data_begin_(data_begin) { }
using iterator = asio::buffers_iterator<asio::streambuf::const_buffers_type>;
std::pair<iterator, bool> operator()(iterator begin, iterator end) {
do {
if(end - begin <= 2) break;
auto end_it = end - begin >= 1024 ? begin + 1024 : end;
auto pos1 = std::find(begin, end_it, '\n');
auto pos2 = std::find(pos1 + 1, end_it, '\n');
if(pos2 == end_it) {
if(end - begin > 1024) { // 1024 字节内如果还没完成报文头,则认定数据非法
sck_->close();
}
break;
}
auto data_size = std::stoull(std::string { pos1 + 1, pos2 + 1 });
if(end - pos2 - 1 < data_size) {
return { begin, false };
}
file_name_ = { begin, pos1 + 1 };
bf::trim(file_name_);
spdlog::info("解析时文件的大小为:{}", end - pos2 - 1);
data_begin_ = pos2 - begin + 1;
spdlog::info("数据的偏移量为:{}", data_begin_);
return { end, true }; // 返回数据的起始点
} while(false);
return { begin, false };
}
private:
std::shared_ptr<tcp::socket> sck_;
std::string& file_name_;
uint64_t& data_begin_;
};
// 注册类型
template<>
struct boost::asio::is_match_condition<HeaderParser> : public boost::true_type { };
使用方式为:
std::string file_name;
uint64_t data_begin;
asio::async_read_until(*sck, asio::dynamic_buffer(data),
HeaderParser(sck, file_name, data_begin), yield_ctx[ec]);
异步文件
Asio 通过 asio::stream_file
实现了异步的文件访问操作。下面是一个例子:
asio::stream_file file { ctx_ };
file.open(file_name,
asio::file_base::create | asio::file_base::write_only | asio::file_base::truncate,
ec);
if(ec) {
spdlog::error("文件打开失败:{}, {}", ec.to_string(), ec.message());
return;
}
spdlog::info("准备写入文件 {}", file_name);
auto beg = &data[0] + data_begin;
asio::async_write(file, asio::buffer(beg, data.size() - data_begin), yield_ctx[ec]);
if(ec) {
spdlog::error("写入文件时发生错误:{}, {}", ec.to_string(), ec.message());
}
多线程
asio 的 io_service::run
可以在多个线程调用。从而实现多线程请求。
在进行多线程中,只有调用了 io_service::run
的线程才能调用完成句柄。
所谓完成句柄,就是调用 async_read 函数时注册的回调函数 |
当多个异步操作被投递时,可能由于多线程执行完成句柄导致竞态条件,这种情况下,可以使用 strand 对这些异步操作进行排序。具体而言,就是 使用 strand::wrap
对完成句柄进行包装
。例如:
socket_.async_read_some(asio::buffer(buffer_), strand_.wrap(callback1));
socket_.async_read_some(asio::buffer(buffer_), strand_.wrap(callback2));
这样,callback2 必定会在 callback1 执行完成后执行。
取消异步操作
调用 close 或者 cancel
参阅
下面列出了一些有用的资源: