fastdfs客户端go语言实现细节

首先是轮子地址:

https://github.com/tedcy/fdfs_client


最近用go做项目,要用到fastdfs

看了下github上star最多的fastdfs go客户端https://github.com/weilaihui/fdfs_client,稍微看了下感觉不太好。

首先是4个issue全是opened

看了下其中一个说连接泄漏的,看了代码,在发送文件的时候确实可能存在这个问题

另外整个连接池写的就是个BUG,来源应该是参考的https://github.com/fatih/pool

fatih/pool这个用管道来实现的连接池很精致,只是用管道来实现就不好做一些维护工作了,例如超时管理

另外代码细节很多写的有问题(发送接受文件都是一次把文件全部吃到内存里),感觉没仔细测过

没办法,虽然对go不熟,但是也只能自己造轮子了


虽然上面说的连接池有不少问题,但是作者本身对go的功底应该比我强不少,看他的代码,还是学了些东西

1 网络字节序的交互

例如发送一个int64的数据pkgLen

1
2
3
buffer := new(bytes.Buffer)
binary.Write(buffer, binary.BigEndian, pkgLen)
conn.Write(buffer.Bytes())

接收也是类似的

1
2
3
buf := make([]byte, 8)
buffer := bytes.NewBuffer(buf)
binary.Read(buffer, binary.BigEndian, &pkgLen)

2 发送字符串并且不足补齐

我很奇怪,也可能是我理解不到位,不管是bytes.Buffer还是io.Writer,go的官方库的涉及到write([]byte)的实现

遇到这种情况都不太优雅

例如fastdfs的协议,写入组名,不足补齐到16字节

我是这么写的

1
2
3
4
5
6
byteGroupName := []byte(this.groupName)
var bufferGroupName [16]byte
for i := 0; i < len(byteGroupName); i++ {
bufferGroupName[i] = byteGroupName[i]
}
buffer.Write(bufferGroupName[:])

写完觉得自己是智障,如果用for && buffer.WriteByte来实现函数,感觉又不太高效

3 发送文件

之前看的weilaihui的代码是自己读文件的,还一次全读到内存里去了,这不能忍啊

明明有sendfile的系统调用,自己找了下果然找到了

1
func (c *TCPConn) ReadFrom(r io.Reader) (int64, error)

TCPConn的ReadFrom()函数,在linux下的实现其实就是sendfile系统调用

由于使用的是net.Conn接口,所以还得转换下,conn.(net.TCPConn).ReadFrom,参数用os.File就行了

4 接受文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type writer interface {
Write(p []byte) (int, error)
}

func writeFromConn(conn net.Conn, writer writer, size int64) error {
sizeRecv, sizeAll := int64(0), size
buf := make([]byte, 4096)
for sizeRecv+4096 <= sizeAll {
recv, err := conn.Read(buf)
if err != nil {
return err
}
if _, err := writer.Write(buf); err != nil {
return err
}
sizeRecv += int64(recv)
}
buf = make([]byte, sizeAll-sizeRecv)
recv, err := conn.Read(buf)
if err != nil {
return err
}
if int64(recv) < sizeAll-sizeRecv {
return fmt.Errorf("recv %d expect %d", recv, sizeAll-sizeRecv)
}
if _, err := writer.Write(buf); err != nil {
return err
}

return nil
}

这里的writer可以是bufio.NewWriter(*os.File),也可以是new(bytes.Buffer)

只是如果是*bufio.Writer的话,调用完这个函数还得自己做Flush()操作

5 简单的发送接收文件

实际没什么用的代码,不过还挺好玩的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package main                                                                    

import (
"fmt"
"os"
"bufio"
"net"
"time"
)

func main() {
go func() {
l,err := net.Listen("tcp",":22111")
if err != nil {
fmt.Println(err)
return
}
for {
conn,err := l.Accept()
if err != nil {
fmt.Println(err)
}
go func() {
f,err := os.Open("/root/jdk-7u67-linux-x64.tar.gz")
defer f.Close()
if err != nil {
fmt.Println(err)
return
}
if _, err := conn.(*net.TCPConn).ReadFrom(f);err != nil {
fmt.Println(err)
return
}
conn.Close()
}()
}
}()
time.Sleep(time.Second*1)

for{
file,err := os.Create("jdk-7u67-linux-x64.tar.gz")
if err != nil {
fmt.Println(err)
return
}

conn,err := net.Dial("tcp","127.0.0.1:22111")
if err != nil {
fmt.Println(err)
return
}
writer := bufio.NewWriter(file)
reader := bufio.NewReader(conn)
size,err := writer.ReadFrom(reader)
if err != nil {
fmt.Println(err)
return
}
err = writer.Flush()
if err != nil {
fmt.Println(err)
return
}
fmt.Println(size)
conn.Close()
}
}

核心就是

writer := bufio.NewWriter(file)

reader := bufio.NewReader(conn)

writer.ReadFrom(reader)

这三句,就可以把TCP的数据写入到文件里,说他没什么用是因为ReadFrom直到EOF或者其他error才会退出,否则会一直阻塞

把上述代码的server段中的主动关连接去掉,就会阻塞住

在实际的网络服务器中,都是利用自制的协议,发送header中带上长度来控制文件的大小的,这样才能用长连接做成连接池,另外每次用服务端关连接基本上就是灾难,TIMEOUT会爆表

简单用用倒是可以用呃

6 Task接口

看weilaihui的代码,上传下载的逻辑不太清晰,所以我自己写了Task接口来定义一次C/S交互行为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type task interface {
SendReq(net.Conn) error
RecvRes(net.Conn) error
}

func (this *Client) doStorage(task task, storageInfo *storageInfo) error {
storageConn, err := this.getStorageConn(storageInfo)
defer storageConn.Close()
if err != nil {
return err
}
if err := task.SendReq(storageConn); err != nil {
return err
}
if err := task.RecvRes(storageConn); err != nil {
return err
}

return nil
}

和服务端的交互本质就是发送一部分数据来接受另外一部分数据

实现一个task,来发送req的数据,然后接收res的数据就OK了

1
2
3
4
5
6
7
8
type storageUploadTask struct {
header
//req
fileInfo *fileInfo
storagePathIndex int8
//res
fileId string
}

这样也很容易看明白一次协议交互,需要发送哪些信息,接受哪些信息

7 连接池

一个客户端,连接池是很重要的

简单的话https://github.com/fatih/pool这个连接池就够用了

但是我希望能实现一些管理功能

因为一般服务端会在客户端没有数据包发送过来的一定时间后关闭连接,因此需要"ping"一下

但是如果用管道来实现,是无法遍历管道的数据的,因此我用list来进行实现

并且在创建连接池的时候创建一个协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
go func() {
timer := time.NewTimer(time.Second * 20)
for {
select {
case finish := <-connPool.finish:
if finish {
return
}
case <-timer.C:
connPool.CheckConns()
timer.Reset(time.Second * 20)
}
}
}()

这样就能每过20秒来检查连接可用性,并且在外部通过向管道发送数据来销毁这个协程


如果有人给我提issue的话,这个客户端就还会更新,因此这篇文章也会更新我踩到的坑

待续