streamera - 基于 TCP 的网络摄像头

https://github.com/bipy/streamera

给本项目起了个可爱的名字streamera,意为 stream camera

本项目与 GoCV 保持一致,以 Apache-2.0 License 开源

基于裸 TCP 使用 Golang 实现,主要功能为捕获摄像头图像并推流至远端显示。

开发环境

go version go1.17.2 darwin/arm64
hybridgroup/gocv 0.28.0

模块设计

程序入口及用户交互:

通过启动参数 -c config.json 指定配置文件并读取对应运行模式的配置,使用配置文件的参数启动程序。

json 格式配置文件示例:

1
2
3
4
5
{
"mode": "client",
"ip": "127.0.0.1",
"port": 6666
}

推流客户端

通过 OpenCV 捕获摄像头的当前帧,压缩成JPEG格式并按照自定义的TCP包结构打包发送给服务端。

使用发送队列将网络操作与业务操作隔离开,避免阻塞。

服务端

服务端在运行周期内共有 4 个独立的模块并行,共使用了 6 个线程。

  1. TCP Listener 循环运行,当建立起连接时启动新线程 TCP Handler 处理连接,通过一个互斥锁来保证运行期间同一时刻只维护一个TCP连接,启动另一个新线程向连接中写入时间戳。
  2. TCP Handler 首先 defer 释放互斥锁,随后进入循环。该模块负责从 TCP 连接中接收并解析数据包,并将解析出的帧数据写入帧容器中,将解析出的时间戳等数据写入 SpeedCounter 中。由于其他模块的共同读写,TCP Handler 的读写操作均有互斥锁或读写锁保证线程安全。
  3. SpeedCounter 维护有两个线程,作用都是从实时数据中计算出每秒数据,避免最终视频输出中数据闪动过快。读写操作均有读写锁保证线程安全。
  4. 主线程负责将图像显示出来,通过 OpenCV 的 API 将网速和延迟等数据覆盖在原始帧上,并展示出来。读写操作均有读写锁保证线程安全。

TCP 包设计

编程保证了每一个包均具有以下结构:

大小 含义
8 bytes 时间戳,指示发送方打包时的时间
8 bytes 包长度,指示接下来的内容的长度
N bytes 内容,N由指示的长度确定

细节设计

防止主线程空读 & 限制 TCP 连接数

如果没有客户端连接到服务端,主线程会一直从帧容器中读取到空(nil),再经由OpenCV API调用显示的话会展示空白画面,甚至可能会导致程序崩溃。因此,先于主线程的 TCP Listener 线程会尝试获取 Frame 锁,直到建立连接后由 TCP Handler 释放 Frame锁。

TCP Listener 锁在连接建立前获取,在连接结束后释放,保证了程序的 TCP 连接唯一。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// TCP Listener 模块
func runListener(server *Server) {
for {
// 获取 TCP Listener 锁,保证只存在单 TCP 连接
server.TCPListenerMutex.Lock()
// 获取 Frame 锁, Block 住主线程
server.FrameMutex.Lock()
conn, err := server.TCPListener.Accept()
if err != nil {
fmt.Println(common.Red("broken connection " + err.Error()))
continue
}
// 成功建立TCP连接,启动 Handler goroutine
go handleConn(server, conn)
}
}
// TCP Handler 模块
func handleConn(server *Server, conn net.Conn) {
// Handler 模块退出时释放 TCP Listener 锁,随后 TCP Listener 便可以接受并建立新连接
defer server.TCPListenerMutex.Unlock()

// 释放 Frame 锁,随后主线程便可以读取 Frame 帧
server.FrameMutex.Unlock()

// 业务处理,省略
}

异常处理

程序在运行时处理了大量可能出现的异常,比如以下几种情况:

服务器宕机

客户端会缓存 180 帧的图像(约 6 秒),并且每 3 秒尝试重新连接到服务器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 客户端:接收
func handleReceive(client *Client) {
// 省略
for {
_, err := io.ReadFull(reader, timeStamp)
// 读取时出现错误,结束本方法
if err != nil {
fmt.Println(common.Red(client.TCPConn.RemoteAddr().String() + " Down! " + err.Error()))
break
}
// 省略
}
}

// 客户端:发送
func handleSend(client *Client) {
// 省略
for pkt := range client.SendQueue {
_, err := writer.Write(pack(pkt, client.TimeDiff))
// 读取时出现错误,启动 retry 方法,结束本方法
if err != nil {
fmt.Println(common.Red("Packet Send Failed! " + err.Error()))
go retry(client)
return
}
}
}

// 断线重连
func retry(client *Client) {
// 尝试连接
conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{
IP: client.RemoteIP,
Port: client.RemotePort,
})
// 如果连不上,睡眠 3 秒再试
if err != nil {
fmt.Println(common.Red(err.Error()))
time.Sleep(time.Second * 3)
go retry(client)
return
}
// 连上了,更新连接,再次启动
client.TCPConn = conn
go handleSend(client)
go handleReceive(client)
}

客户端下线

TCP Listener获取到 Frame 锁并等待新连接,服务器端的主线程循环获取不到 Frame 锁,因此网速延迟等数据不会在同一帧上覆写多次,此时表现为画面静止,与大多数直播平台表现相同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// 服务端:TCP Listener
func runListener(server *Server) {
for {
// 如果当前有存活的连接,会在这里 asleep
// 如果当前没有连接,抢到 TCPListener 锁
server.TCPListenerMutex.Lock()
// 抢到 Frame 锁,然后再等待新连接
server.FrameMutex.Lock()
conn, err := server.TCPListener.Accept()
if err != nil {
fmt.Println(common.Red("broken connection " + err.Error()))
continue
}
go sendTimeStamp(conn)
go handleConn(server, conn)
}
}
// 服务端:TCP Handler
func handleConn(server *Server, conn net.Conn) {
// 如果本方法结束,释放 TCPListenerMutex
defer server.TCPListenerMutex.Unlock()
// 省略
for {
// 从连接中读取数据
_, err := io.ReadFull(reader, timeStamp)
// 如果连接发生错误,结束本方法
if err != nil {
fmt.Println(common.Red(conn.RemoteAddr().String() + " Down! " + err.Error()))
break
}
// 省略
}
}
// 服务端:发送时间戳
func sendTimeStamp(conn net.Conn) {
// 省略
for {
// 出现错误便结束方法
_, err := writer.Write(timePkt)
if err != nil {
return
}
}
}

流量计算

流量计算由两个模块共同完成,TCP Handler 在接收到数据包后更新当前累计收到的字节数,SpeedCounter 计算大约一秒的时间内的平均速度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// TCP Handler 模块
func handleConn(server *Server, conn net.Conn) {
// 业务处理,省略
for {
// 业务处理,省略

server.Counter.Mutex.Lock() // 获取 SpeedCounter 锁

// 更新 SpeedCounter 中的 ByteCount,即累积收到的字节数
// 加上帧大小(动态)和包头(16 bytes)
server.Counter.ByteCount += length + common.HeadPacketSize

server.Counter.Mutex.Unlock() // 释放 SpeedCounter 锁
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// SpeedCounter 模块
func calcSpeed(counter *SpeedCounter) {
counter.Mutex.RLock() // 获取 SpeedCounter 锁

lastCount := counter.ByteCount // 获取累积字节数
lastTime := time.Now().UnixMicro() // 获取当前时间戳

counter.Mutex.RUnlock() // 释放 SpeedCounter 锁
for {
time.Sleep(time.Second) // 睡眠一秒

counter.Mutex.Lock() // 获取 SpeedCounter 锁

curTime := time.Now().UnixMicro() // 获取当前时间戳
curCount := counter.ByteCount // 获取当前累计字节数

counter.SpeedPerSecond = (curCount - lastCount) * 8 * time.Second.Microseconds() / (curTime - lastTime) // 根据流量差与时间差计算网速,并写入变量以供主线程读取

counter.Mutex.Unlock() // 释放 SpeedCounter 锁
lastTime, lastCount = curTime, curCount // 状态转移
}
}

延迟计算

TCP 包结构设计为每个包的首 8 个字节为客户端打包时的时间戳,由此便可以利用该时间戳计算从客户端打包到服务端接收所耗费的时间。

一个简单的思路是直接用服务端的当前时间减去包首部的时间,即可近似认为是传输过程中所消耗的延迟。但是这个思路忽视了一个常见的问题:客户端和服务端的时间往往是不同步的。

不同语言获取 timestamp 的具体实现可能不同,但是可以假定是调用了系统的 date 命令,那么可以认为在同一操作系统上的两个程序,他们的时间是同步的。而问题就出在这个“同一操作系统”:客户端与服务端通常情况下都不会在同一台机器上,更别提操作系统了。

那么既然大部分操作系统都是 ntp 同步过的时间,几毫秒的误差确实也在接受范围内,这个问题就不再深究了?如果两端操作系统的时间差距在几秒,几分钟呢?这正是本设计较真的地方——如何计算出准确的 TCP 延迟。

通过查阅资料,发现最常见的 TCP 延迟测试方法是 TCPing,GitHub 上看了几个 TCPing 实现的源码,原理基本都是:计时开始 –> 建立连接并马上关闭连接 –> 计时结束,讲究一点的会把 DNS Lookup 单独拿出来,但是这样的过程包含多个 RTT,并不是我想知道的“从一端到另一端的延迟”。

还有个方法是手动构建TCP包头,手动计算 checksum,发送 SYN 然后手动接收 SYN & ACK,即 TCP 三次握手的前两次,这样算一个RTT,很合理,但很麻烦。

与通用场景不同,我注意到在本项目中客户端和服务端都是由我控制的,也就是说我可以同时在两边构造payload供计算延迟使用——那么,我就想到了一个简单且有效的算法:通过校时使得客户端的时间与服务端保持一致,校时需要一个 RTT,证明如下:

假设服务端有四个时间节点,分别为 $t_0,t_1,t_2,t_3$,其中 $t_0<t_1<t_2<t_3$,客户端与服务端存在可为任意实数的时差 $\delta$,对任意时间节点,满足:
$$
t_{客户端}=t+\delta
$$
假设 $t_0$ 时刻服务端发送包含有 $t_0$ 的数据包到客户端,$t_1$ 时刻客户端接收到包,实际延迟为 $t_1-t_0$,客户端记录下:
$$
d=t_{客1}-t_0=t_1+\delta-t_0
$$
假设 $t_2$ 时刻客户端发送包含有 $t_{客2}-d$ 的数据包到客户端,$t_3$ 时刻服务端接收到包,实际延迟为 $t_3-t_2$,服务端记录下:
$$
total=t_3-(t_{客2}-d)=t_3-((t_2+\delta)-(t_1+\delta-t_0))
$$

$$
total=t_3-(t_2-t_1+t_0)
$$

$$
total=(t_3-t_2)+(t_1-t_0)
$$

即服务端最终所得值 $total$ 为一次发包和一次收包延迟的和,与两服务器间的时差无关,与客户端收包到发包之间间隔的时间无关。

使用该算法,将客户端和服务端的时差调整至 1 小时,经过测试,计算得出的延迟仍为正常值。

服务端发送校时包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 服务端每隔一段时间(这里 128ms,可以任意)便向客户端发送自己的时间戳
func sendTimeStamp(conn net.Conn) {
writer := bufio.NewWriter(conn)
for {
time.Sleep(time.Millisecond << 7)

timePkt := make([]byte, common.TimeStampPacketSize)
binary.LittleEndian.PutUint64(timePkt, uint64(time.Now().UnixMicro()))

_, err := writer.Write(timePkt)
if err != nil {
return
}

err = writer.Flush()
if err != nil {
return
}
}
}

客户端收到包并校时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 客户端每收到时间戳便更新自己的记录 TimeDiff
func handleReceive(client *Client) {
reader := bufio.NewReader(client.TCPConn)
timeStamp := make([]byte, common.TimeStampPacketSize)

for {
_, err := io.ReadFull(reader, timeStamp)
if err != nil {
fmt.Println(common.Red(client.TCPConn.RemoteAddr().String() + " Down! " + err.Error()))
break
}
curTime := time.Now().UnixMicro()
recvTime := int64(binary.LittleEndian.Uint64(timeStamp))

client.TimeDiff = curTime - recvTime
}
}

// 客户端发包时使用自己的时间戳减去 TimeDiff
func pack(frame []byte, td int64) []byte {
// ------ Packet ------
// timestamp (8 bytes)
// content-length (8 bytes)
// content (content-length bytes)
// ------ End ------

timePkt := make([]byte, common.TimeStampPacketSize)
binary.LittleEndian.PutUint64(timePkt, uint64(time.Now().UnixMicro() - td))

contentLengthPkt := make([]byte, common.ContentLengthPacketSize)
binary.LittleEndian.PutUint64(contentLengthPkt, uint64(len(frame)))

var pkt []byte
pkt = append(pkt, timePkt...)
pkt = append(pkt, contentLengthPkt...)
pkt = append(pkt, frame...)

return pkt
}

服务端收到包并更新延迟

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 服务端收到校时后的包,取差值的 1/2 作为当前延迟
func handleConn(server *Server, conn net.Conn) {
// 业务处理,省略
for {
// 业务处理,省略

server.Counter.Mutex.Lock() // 获取 SpeedCounter 锁

// 更新 SpeedCounter 中的 LatencyRealTime,即实时延迟
server.Counter.LatencyRealTime = (curTime - recvTime) >> 1

server.Counter.Mutex.Unlock() // 释放 SpeedCounter 锁
}
}

服务端 SpeedCounter 每秒更新显示

1
2
3
4
5
6
7
8
9
10
// 服务端 SpeedCounter 模块
func updateLatency(counter *SpeedCounter) {
for {
counter.Mutex.Lock()
counter.LatencyPerSecond = counter.LatencyRealTime
counter.Mutex.Unlock()

time.Sleep(time.Second) // 每秒更新一次,避免视频中的延迟变化太快
}
}

截图

启动服务端

启动客户端

视频传输

经过压缩,1280*720 30fps 的视频带宽需求仅为 10~15 Mbps

两幅图运行时设置的压缩率不同,其中上图 JPEG Quality 设置为 25,下图设置为 50。可以观察到上图出现了非常明显的色带和噪点,而下图的传输所需带宽大大增加。

一些讨论

服务端与客户端都具有完善的异常处理机制,程序运行的稳定性与可用性有一定保证。

视频流压缩的部分采用的是 Go 语言官方库image/jpeg,具有较高的稳定性和性能,但可定制的参数较少。

本程序使用的 GoCV 是对 OpenCV 一层 Golang 封装,通过 GoCV 完成了摄像头捕获,视频覆写与视频输出三个功能。

为什么使用 TCP 而不是 UDP?

TCP 可以保证数据的按序到达,在该应用场景(局域网)里完全可以胜任。如果考虑到长距离、网络环境差的传输,UDP 仍然不是一个优秀的选择。

首先 UDP 限制了包的大小,对于大型的视频帧,发送方和接收方需要自己定义发送接收规则以及 Drop 策略,麻烦不说,最后实现一通效果可能还不如 TCP。

直接使用 TCP 和 UDP 在公网进行数据传输和裸奔无异,尤其是未加密的 RAW 视频流,TCP 比起 UDP 更容易套上加密层。如果不限制传输层协议的话,我会选择 h2tls1.3(可以减少一个 RTT),如果哪天 quic 普及了会更倾向于使用 quic

关于视频流的压缩?

关于视频流处理的滤镜我了解不算多,就简单地说说我的理解。

如何在接收端找到视频质量与视频码率的平衡点是视频传输的一个关键问题。为了节省网络流量,流媒体的码率一般都比较低,而随着时代发展,终端的性能已经强大到可以对视频做一些简单的处理了。最常见的就是对视频的 upscale, YUV420 色度升频到 RGB24,传统的 bilinearlanczos等算法,后来的基于神经网络的各种算法就太多了。如果视频播放端性能足够,流中间插个 vapoursynth 脚本升下频,能大幅提升视频质量。

说到底直播使用一帧帧的图片推流确实非常低效,如果能提前编码成x264或者hevc的视频再经由网络传输的话会好很多。

为什么选择 Go 语言?

因为 Go 实现网络编程与多线程编程是最简单最舒服的。

关于本项目

https://github.com/bipy/streamera

给本项目起了个可爱的名字streamera,意为 stream camera

本项目与 GoCV 保持一致,以 Apache-2.0 License 在 GitHub 开源。