异步定时器

首先来看一个异步的定时器:

void idle(boost::system::error_code const& ec, asio::steady_timer* timer) {
    if(ec) {
        spdlog::error("运行 idle 循环出错:{}", ec.message());
        return;
    }

    timer->expires_at(timer->expiry() + std::chrono::seconds(1)); (3)
    timer->async_wait(boost::bind(idle, asio::placeholders::error, timer));
}

int main(int, char**) {
    asio::io_context io; (1)

    asio::steady_timer t(io, std::chrono::seconds(1));
    t.async_wait(boost::bind(idle, asio::placeholders::error, &t)); (2)

    io.run();
    return 0;
}
1首先创建一个 io_context 对象。io_context 具备以下特点:
  • io_context 是 asio 的事件循环

  • io_context 可以被多线程运行。推荐的使用方式为:per therad on loop

  • io_context 不会对打乱异步动作的顺序,因为多线程也无需加锁

2创建一个定时器并进入异步等待。Asio 的定时器都是单次的,只会被运行一次
3从 timer 对象中拿到上次过期时间,并根据过期时间计算出下一个需要激活的 时间点 。注意这里使用的 expires_at 代表了一个时间点

在上面的异步定时器中,首先创建一个定时器,然后定时器再 io.run() 中被执行。当定时器超时事件发生时,回调函数被执行,然后回调函数再次激活定时器,从而创建一个不断执行的定时器,也保证了 io_context 一直有任务在执行,不会被退出。

异步 TCP Echo 套接字

TCP Echo 套接字是一个简单的服务器例子。UDP 也与其类似

#include <cstdlib>
#include <iostream>
#include <memory>
#include <utility>
#include "asio.hpp"

using asio::ip::tcp;

class session : public std::enable_shared_from_this<session> {
public:
    session(tcp::socket socket) : socket_(std::move(socket)) { }

    void start() {
        do_read();
    }

private:
    void do_read() {
        auto self(shared_from_this());
        socket_.async_read_some(asio::buffer(data_, max_length), [this, self](std::error_code ec, std::size_t length) { (2)
            if(!ec) {
                do_write(length);
            }
        });
    }

    void do_write(std::size_t length) {
        auto self(shared_from_this());
        asio::async_write(socket_, asio::buffer(data_, length),
                          [this, self](std::error_code ec, std::size_t /*length*/) {
                              if(!ec) {
                                  do_read();
                              }
                          });
    }

    tcp::socket socket_;
    enum { max_length = 1024 };
    char data_[max_length];
};

class server {
public:
    server(asio::io_context &io_context) : acceptor_(io_context, tcp::endpoint(tcp::v4(), 9000)) { (1)
        do_accept();
    }

private:
    void do_accept() {
        acceptor_.async_accept([this](std::error_code ec, tcp::socket socket) {
            if(!ec) {
                std::make_shared<session>(std::move(socket))->start();
            }

            do_accept();
        });
    }

    tcp::acceptor acceptor_;
};

int main(int argc, char *argv[]) {
    asio::io_context io_context;

    server s(io_context);

    io_context.run();

    return 0;
}

和异步定时器类似,套接字也是运行在 io_context 上的。TCP 套接字在 Asio 中被分为监听套接字(tcp::acceptor)和普通套接字(tcp::socket)。监听套接字被用来接收连接,普通套接字被用来处理连入的连接。

1此处传入参数来构建一个监听套接字。然后进行异步监听。由于在 async_accept 的回调函数中又调用了 async_accept,因而保证了监听套接字一直有任务
2使用闭包管理套接字的生命周期

在上面的例子中,使用构造函数创建了监听套接字,这种方式下监听套接字会自动打开,但是在某些场景下可能需要手动打开或关闭套接字,这时可以使用下面的方法:

boost::system::error_code ec;

tcp::endpoint endpoint(tcp::v4(), 9000);

acceptor_.open(endpoint.protocol(), ec);
handle_err(ec);
acceptor_.bind(endpoint, ec);
handle_err(ec);

acceptor_.listen();

UDP 的也与此类似:

boost::system::error_code ec;

auto address = asio::ip::make_address("127.0.0.1", ec);
handle_err(ec);

udp::endpoint endpoint(address, 9000);
socket_.open(endpoint.protocol(), ec);
handle_err(ec);
socket_.bind(endpoint, ec);
handle_err(ec);

套接字选项

TCP 和 UDP 设置套接字的方式大同小异,其中最常见的一个选项为端口复用,其有两种设置方式:

  • 在构造函数中设置

    tcp::acceptor acceptor(io_context, endpoint, true /* 打开端口复用 */);
  • 使用普通的设置方式

    acceptor_.open(endpoint.protocol(), ec);
    handle_err(ec);
    acceptor_.set_option(tcp::socket::reuse_address(true), ec);
    handle_err(ec);
    acceptor_.bind(endpoint, ec);
    handle_err(ec);

基于协程的异步套接字

如果项目中可以使用 Boost::coroutine,那么就可以使用基于协程的套接字。使用基于协程的套接字,极大地降低了代码的复杂度:

asio::spawn(io_context_, [&](asio::yield_context yield_ctx) { (1)
    while(acceptor_.is_open()) {
        boost::system::error_code ec;

        auto socket = std::make_shared<tcp::socket>(io_context_);
        acceptor_.async_accept(*socket, yield_ctx[ec]); (2)
        handle_err(ec);

        asio::spawn(io_context_.get_executor(), [this, socket](asio::yield_context yield) { (3)
            this->accept_socket(socket, yield_ctx);
        }, asio::detached);
    }
});
1创建一个新的协程
2在 yield_ctx 下执行异步操作
3在新协程中操作接收套接字

如上所示,通过使用协程,避免了回调函数的相互嵌套,简化了开发流程。对于 asio::spawn 可以简单地将其比作新线程。

异步操作

除了普通的异步方法外,Asio 还提供了其它方法来讲我们自己的方法变成异步操作,具体为以下两个方法:

方法是否阻塞作用

io_context::strand::defer

函数对象在此函数结束后才会被调度

asio::dispatch

提交一个完成令牌或函数对象执行

asio::post

提交一个完成令牌或函数对象执行

读取数据到变量中

asio::buffer 具有第二种重载方式:

mutable_buffer buffer(
    const mutable_buffer & b,
    std::size_t max_size_in_bytes);

因此,如果需要将变量读取到变量中,方式为:

uint64_t data_len;
socket_.async_receive(asio::buffer(&data_len, sizeof(data_len)), yield_ctx[ec]);

异步文件

Asio 通过类 asio::stream_file 完成了对文件异步读写的支持,使用的方式为:

asio::stream_file file;
file.open(file_name,
    asio::stream_file::create |
    asio::stream_file::write_only |
    asio::stream_file::append, ec
);
if(ec) {
    spdlog::error("打开日志文件失败:{}", ec.message());
    return;
}
asio::async_write(file, asio::buffer(res, res.size()), yield_ctx[ec]);

多播

使用多播时有一些不同。现在假设多播地址为 address:port,然后对于接收者和发送者的操作分别为:

-- sender
create_udp_socket()
join_multicast_group()
send_to(address:port)
-- recever
create_udp_socket()
listen("0.0.0.0":port)
join_multicast_group()
recv()

当一个套接字加入一个组播后,此套接字就可以收到来自此组播地址的数据。然后使用绑定时的端口对数据进行过滤。因此这里 listen 的唯一作用就是对组播地址进行过滤

由于绑定的地址是“通配地址”,因此只有端口过滤起作用。也就是说,在上面的 demo 中,recever 可用得到所有发往 port 的数据,无论组播地址是什么

更好的办法是直接绑定组播地址,但是 windows 无法绑定组播地址

在单机测试中,可用通过绑定发送端的 ip 来解决。但是单机中发送端 ip 和接收端 ip 相同,不知道实际应该如何

自动拆包

Asio 的 read_until 函数可用实现对 TCP 数据流的自动拆包。自动拆包主要用到了下面的重载形式:

async_read_until(
    AsyncReadStream & s,
    DynamicBuffer_v1 && buffers,
    MatchCondition match_condition
)

其中 MatchCondition 有两种形式:函数抑或是类。由于函数其本身限制,下面只讲解类的方式。

要使用类的方式实现 MatchCondiction,有两个要点:

  • 实现 std::pair<iterator, bool> operator()(iterator begin, iterator end) 方法

  • 使用

    template<>
    struct boost::asio::is_match_condition<HeaderParser> : public boost::true_type { };

    注册类型

下面是一个 MatchCondition 的实现例子:

下面的例子中,协议的规定为:

data_size\n
file_name\n
file_data

其中 data_size + file_name 的长度不能超过 1024 字节

class HeaderParser {
public:
    HeaderParser(std::shared_ptr<tcp::socket> sck, std::string& file_name, uint64_t& data_begin)
        : sck_(sck), file_name_(file_name), data_begin_(data_begin) { }

    using iterator = asio::buffers_iterator<asio::streambuf::const_buffers_type>;

    std::pair<iterator, bool> operator()(iterator begin, iterator end) {
        do {
            if(end - begin <= 2) break;

            auto end_it = end - begin >= 1024 ? begin + 1024 : end;
            auto pos1   = std::find(begin, end_it, '\n');
            auto pos2   = std::find(pos1 + 1, end_it, '\n');
            if(pos2 == end_it) {
                if(end - begin > 1024) {    // 1024 字节内如果还没完成报文头,则认定数据非法
                    sck_->close();
                }
                break;
            }

            auto data_size = std::stoull(std::string { pos1 + 1, pos2 + 1 });
            if(end - pos2 - 1 < data_size) {
                return { begin, false };
            }
            file_name_ = { begin, pos1 + 1 };
            bf::trim(file_name_);
            spdlog::info("解析时文件的大小为:{}", end - pos2 - 1);

            data_begin_ = pos2 - begin + 1;
            spdlog::info("数据的偏移量为:{}", data_begin_);

            return { end, true }; // 返回数据的起始点
        } while(false);

        return { begin, false };
    }

private:
    std::shared_ptr<tcp::socket> sck_;
    std::string&                 file_name_;
    uint64_t&                    data_begin_;
};

// 注册类型
template<>
struct boost::asio::is_match_condition<HeaderParser> : public boost::true_type { };

使用方式为:

std::string               file_name;
uint64_t                  data_begin;
asio::async_read_until(*sck, asio::dynamic_buffer(data),
                       HeaderParser(sck, file_name, data_begin), yield_ctx[ec]);

异步文件

Asio 通过 asio::stream_file 实现了异步的文件访问操作。下面是一个例子:

asio::stream_file file { ctx_ };
file.open(file_name,
            asio::file_base::create | asio::file_base::write_only | asio::file_base::truncate,
            ec);
if(ec) {
    spdlog::error("文件打开失败:{}, {}", ec.to_string(), ec.message());
    return;
}

spdlog::info("准备写入文件 {}", file_name);

auto beg = &data[0] + data_begin;
asio::async_write(file, asio::buffer(beg, data.size() - data_begin), yield_ctx[ec]);
if(ec) {
    spdlog::error("写入文件时发生错误:{}, {}", ec.to_string(), ec.message());
}

多线程

asio 的 io_service::run 可以在多个线程调用。从而实现多线程请求。

在进行多线程中,只有调用了 io_service::run 的线程才能调用完成句柄。

所谓完成句柄,就是调用 async_read 函数时注册的回调函数

当多个异步操作被投递时,可能由于多线程执行完成句柄导致竞态条件,这种情况下,可以使用 strand 对这些异步操作进行排序。具体而言,就是 使用 strand::wrap 对完成句柄进行包装 。例如:

socket_.async_read_some(asio::buffer(buffer_), strand_.wrap(callback1));
socket_.async_read_some(asio::buffer(buffer_), strand_.wrap(callback2));

这样,callback2 必定会在 callback1 执行完成后执行。

取消异步操作

调用 close 或者 cancel

参阅

下面列出了一些有用的资源:

Last moify: 2022-12-04 15:11:33
Build time:2025-08-18 18:43:08
Powered By asphinx