概述
在任何网络编程任务中,同步发送与接收问题都是不得不面对的核心问题之一。
网络编程是现代软件开发不可或缺的一个领域,尤其是在构建高性能的并发系统时。这通常涉及到使用如Netty、netpoll、gnet、libevent等强大的网络编程框架。然而,即使是这些高级框架,也无法避免一些通用的编程挑战,比如同步发送和接收数据的问题。
无论是简单的客户端-服务器应用,还是复杂的分布式系统,确保数据的正确性和及时性都是基本要求。
什么是同步发送与接收的问题?
简单来说就是,同步发送与接收的问题是指在某个网络连接上同时进行数据发送和接收时保持数据同步的挑战。在并发场景下,可能会有多个数据包同时在传输,需要确保每个响应都与其对应的请求匹配。
怎么解决同步发送与接收?
有很多方案来解决这个问题,目前我了解到的主要有以下三种:
- 将请求和响应放到同一个线程线程中
- 一个请求一个连接
- 使用唯一请求ID
一个请求一个连接
Http 1.0协议就是使用这种方式,
Http 1.0版本的协议就是使用这种方式,每次发送请求之前打开一个新的连接,发送请求并等待响应,这样请求和响应是可以匹配的。
由于每个请求都需要建立一个新的 TCP 连接,这会引起开销,并且无法充分利用网络资源。因此,在 HTTP/1.1 中,添加了持久连接(也叫 keep-alive 或连接复用)的特性,允许在同一TCP连接中发送多个请求,从而提高性能。但是即便如此,HTTP/1.1 仍然无法避免队头阻塞(head-of-line blocking)问题,这是因为 HTTP/1.1仍然规定请求必须按照顺序返回,尽管它们可能在服务器上并行处理。
将请求和响应放到同一线程中
HTTP 1.1协议就是使用这种方式。
将请求和响应放在同一个线程处理其中一种策略是让线程发送请求之后立即阻塞,等待响应然后处理它。这个模式对于一些简单的客户端-服务器交互或者单线程应用可能会有用,但是在处理大规模并发时可能会有性能问题。
这种解决办法的核心点就是单线程收发,请求发送出去以后就阻塞等待响应,在收到响应以后就返回一个请求响应对。
这个解决方案在高并发的情况下,你可以想象,任务队列会有一堆请求在排队。
使用唯一请求ID
HTTP 2协议就是使用的这种方式。
在 HTTP/2 中,客户端每次发起新的请求时,会在一个已经建立的TCP连接上创建一个新的流,并给这个流分配一个未使用的唯一的流标识符(Stream ID)。所有与这个请求相关的帧(例如 HEADERS 帧、DATA 帧等)都将包含这个流标识符在它们的头部字段中。
当服务器收到这些帧时,它会查看头部字段中的流ID,并将帧与该ID对应的流关联起来。然后,服务器会在同一流中发送响应,这样请求和响应就被正确地匹配在一起了。
这种基于流的模型允许在一个单一的 TCP 连接上同时处理多个并行的请求和响应,有效地解决了 HTTP/1.x 协议中的队头阻塞问题,提高了性能和效率。
上图中的stream identifier就是请求唯一标识,用来匹配响应内容。
自定义通信协议同步实现
既然已经有了解决方案,我们也基于自定义协议来实现一个同步发送和接收的实现吧。
基本思路
我们来整理一下所有涉及到的元素:
- 客户端(Client)
- 服务端(Server)
- 请求
- 响应
- 请求ID(Request Identifier)
- 响应ID(Response Identifier)
- 请求响应表(Request To Response Table)
它们之间的关系是?
-
发送(Request): 客户端为新的请求生成唯一请求ID,并建立一个对应该请求ID的ResponseFuture对象。这个对象负责等待和接收异步到达的响应。然后,客户端将包含请求ID的请求消息发送给服务端。
-
处理(Process): 服务端接收到请求消息后,解析出请求ID,并基于请求内容处理请求。
-
响应(Response): 服务端处理完毕后,构建一个响应消息,包含原始请求ID,并将此响应发送回客户端。
-
接收并完成(Receive and Complete): 客户端接收响应消息,通过响应中的请求ID找到对应的ResponseFuture对象,并把响应消息传递给它。这时,ResponseFuture对象标记为完成,触发等待该响应的客户端线程继续执行。
画个时序图
搞定收工,如有错误请留言告知,谢谢
帧定义
我们先来定义一个帧,帧只包含一个字段就是序列号,实际业务中根据自己的协议设计来自己增加字段。
type Frame struct {
Seq uint64
}
定义同步发送和接收函数
我们来定义一个SendSync函数来表示同步发送和接收函数,这个函数主要接受上面的Frame
结构并返回一个Frame
响应。
SendSync(serverAddr string, frame *Frame, timeout time.Duration) (*Frame, error)
这个函数的其他参数有服务端地址,超时时间;超时时间很有必要,这表示在最坏情况下最长多长时间可能会释放线程资源。
// TcpClient定义了TCP客户端的结构
type TcpClient struct {
// 其他字段省略
respTable map[uint64]ResponseFuture
}
// SendSync是发送同步请求并等待响应的函数
func (c *TcpClient) SendSync(serverAddr string, frame *Frame, timeout time.Duration) (*Frame, error) {
// 请求的发送和连接处理逻辑省略
// 将“会话标识符”加入等待表
rf := NewResponseFuture(frame.Seq, timeout)
defer rf.Close()
c.addSeqFuture(frame.Seq, rf)
// 发送请求逻辑省略
// 等待响应
respFrame, err := rf.Wait()
if err != nil {
return nil, err
}
delete(tcpClient.respTable, frame.Seq)
return respFrame, nil
}
// 清理资源
func (c *TcpClient) cleanupResponseFutures() {
// 设置定时器,每30秒触发一次扫描
c.ticker = time.NewTicker(30 * time.Second)
go func() {
for {
<-c.ticker.C
c.doCleanupRespFutures()
}
}()
}
func (c *TcpClient) doCleanupRespFutures() {
c.mux.Lock()
defer c.mux.Unlock()
now := time.Now()
c.mux.Lock()
defer c.mux.Unlock()
for seq, future := range c.respTable {
if now.Sub(future.Timestamp()) > 30*time.Second {
// 如果ResponseFuture超过30秒钟
// 从respTable删除
delete(c.respTable, seq)
// 关闭CountDownLatch
future.Close()
}
}
}
// ResponseFuture接口定义了“会话标识符”的行为
type ResponseFuture interface {
Add(frame *Frame)
Wait() (*Frame, error)
Close()
}
// ResponseFutureI是ResponseFuture接口的一个实现
type ResponseFutureI struct {
// Other fields and methods here
}
使用“会话标识符”模式,每个请求都会创建一个未来期货对象,该对象在内部使用CountDownLatch
或其他同步机制来等待和接收对应的响应。
还有其他解决方案吗?
除了使用“会话标识符”模式之外,还有多种方案和设计模式可以解决同步发送与接收的问题:
- 事件驱动模型:通过回调和事件监听器,将响应和对应的请求进行匹配。
- 消息队列:使用队列存储发送的请求,并在接收到响应时进行处理。
- 协程模式:利用协程来管理并发请求,可以使代码保持清晰并易于理解。
- 选择器模式:使用如Go中的
select
语句类似的构造,根据接收情况动态选择操作。
总结
在网络程序设计中,确保数据的同步发送和接收是关键。无论是利用“会话标识符”策略还是其他并发编程模式,目标都是为了确保数据的一致性和程序的可靠性。
通过上述Go代码示例能够看出,合理的程序结构和同步机制对于管理并发连接和请求至关重要。正确应用这些模式和框架,可以极大地减少在网络编程中遇到的同步问题。
搞定收工
到此为止,我们已经探讨了解决网络编程中同步发送与接收问题的策略,并通过例子展示了这些概念的Go语言实现。希望本文能帮助你更好地理解并发网络交互中的同步问题,并让你在开发高效稳定的网络应用时胸有成竹。