NSQ 如何实现文件队列

  1. NSQ如何实现文件队列
    1. NSQ 的 diskQueue
    2. 结构定义
    3. 写入操作
    4. 读取操作
    5. 消息处理循环
    6. 磁盘写入
    7. 磁盘读取
    8. 磁盘同步
    9. 文件轮转
    10. 清空和退出
    11. 尾声

NSQ如何实现文件队列

消息队列是一个比较常见的组件。

在将并行任务串行化处理,或者将生产者和消费者进行解耦的时候,都可以使用消息队列来做。在 Golang 的标准库中,也有内存队列库可以直接使用。但是,内存队列的缺陷就是一旦出现问题,程序意外退出,就会导致所有队列中未处理的数据丢失。

基于上面的考虑,那么我们就会使用可持久化的外部消息队列中间件,比如 Kafka。但是引入一个新的组件就对整个系统的稳定性上又增加了一些不确定,同时整个系统的运维成本又变高了,那么不如自己实现一个可以持久化的文件队列,既满足要求又不增加复杂性。

NSQ 的 diskQueue

下面的讲解,是基于 NSQ 实现的文件队列 diskQueue 来进行的。具体的地址在这里 ,这个文件队列比较简单,但也实现了常用的功能:

  1. 支持 消息入队出队,并将堆积消息写入磁盘;
  2. 支持 设置写入磁盘的触发消息数;
  3. 支持 设置消息长度,在生产和消费时检查是否合法;
  4. 支持 设置单文件大小,自动文件轮转;

结构定义

type diskQueue struct {
    // 64bit atomic vars need to be first for proper alignment on 32bit platforms

    readPos      int64 // 文件读位置
    writePos     int64 // 文件写位置
    readFileNum  int64 // 正在读的文件编号
    writeFileNum int64 // 正在写的文件编号
    depth        int64 // 未消费数量

    sync.RWMutex       // 读写消息的锁

    name            string        // 文件名称
    dataPath        string        // 文件路径
    maxBytesPerFile int64         // 单文件最大 Bytes
    minMsgSize      int32         // 消息最小长度
    maxMsgSize      int32         // 消息最大长度
    syncEvery       int64         // 触发磁盘同步的消息数量
    syncTimeout     time.Duration // 触发磁盘同步的等待事件
    exitFlag        int32         // 队列正在退出的标志位
    needSync        bool          // 磁盘同步标志位


    nextReadPos     int64   // 下次文件读位置
    nextReadFileNum int64   // 下次读的文件编号

    readFile  *os.File      // 读文件的句柄
    writeFile *os.File      // 写文件的句柄
    reader    *bufio.Reader // 读文件的缓冲区
    writeBuf  bytes.Buffer  // 写文件的缓冲区

    readChan chan []byte    // 用于数据读取的管道

    depthChan         chan int64  // 用于传递数据积压量管道
    writeChan         chan []byte // 写消息 Channel
    writeResponseChan chan error  // 写消息结果 Channel
    emptyChan         chan int    // 通知清空队列 Channel
    emptyResponseChan chan error  // 通知清空队列结果 Channel
    exitChan          chan int    // 通知退出 Channel
    exitSyncChan      chan int    // 磁盘同步协程 退出结果 Channel

    logf AppLogFunc  // 日志函数
}

整个结构体的定义比较庞大,里面涉及的变量比较多。我们先梳理主要的几个变量之间的关系。首先,写消息 ->写文件 ->读消息 这个流程。

img

上面的图中,使用不同的颜色做了分类:蓝色是函数、绿色是变量、靛青是 Channel、紫色是用户操作。

我们可以清楚的看到,Put()ReadChan() 是用户发起的操作,ioLoop() 是 diskQueue 初始化时执行的一个协程。这三个操作分别发起了三个流程,我们分别来看:

写入操作

写入操作是比较简单的,首先上锁,然后检查 diskQueue 是否在退出状态,然后直接向 writeChan 写即可。这里需要注意的是,为什么需要用一个读写锁,而不是互斥锁

在写入的时候,其实加了“读锁”,并不能起到排他的左右,实际上保证消息串行写入文件的是依靠 writeChan,它是一个阻塞 Channel。

而写加锁是在 diskQueue 退出时用的,保证所有写的协程都不能再获取到读锁之后,将 diskQueue 关闭,这一部分是在 exit() 方法中实现的。我们来看一下代码:

// Put writes a []byte to the queue
func (d *diskQueue) Put(data []byte) error {
    // 加读锁
    d.RLock()
    defer d.RUnlock()

    // 检查 退出标志位是否置位
    if d.exitFlag == 1 {
        return errors.New("exiting")
    }

    // 向 writeChan 写入消息
    d.writeChan <- data
  
    // 等待写入结果
    return <-d.writeResponseChan
}

// 退出整个 diskQueue
func (d *diskQueue) exit(deleted bool) error {
    // 加写锁,互斥所有写入操作
    d.Lock()
    defer d.Unlock()

    // 将 exitFlag 置位,写消息方法 Put() 中会检查此标志位
    d.exitFlag = 1

    // 日志
    if deleted {
        d.logf(INFO, "DISKQUEUE(%s): deleting", d.name)
    } else {
        d.logf(INFO, "DISKQUEUE(%s): closing", d.name)
    }

    // 关闭 exitChan,表示退出,ioLoop 
    close(d.exitChan)
    
    // ioLoop 退出时,会向此 channel 写值
    // 这里是等待 ioLoop 退出
    <-d.exitSyncChan

    // 关闭消息堆积的 Channel
    close(d.depthChan)

    // 关闭读的文件
    if d.readFile != nil {
        d.readFile.Close()
        d.readFile = nil
    }

    // 关闭写的文件
    if d.writeFile != nil {
        d.writeFile.Close()
        d.writeFile = nil
    }

    // 退出
    return nil
}

读取操作

读取操作相对写入来说,要简单很多,用户只需要从 ReadChan() 方法中拿到读数据的 Channel,然后读取就可以。readChan 是一个阻塞 Channel,当没有需要消费的消息时,会阻塞在读取上。

// ReadChan returns the receive-only []byte channel for reading data
func (d *diskQueue) ReadChan() <-chan []byte {
    return d.readChan
}

消息处理循环

这个 Loop 是整个 diskQueue 中最重要的一环,就是上图中间的部分。但实际上,这个 Loop 做了比图中更多的 事情:

  1. 如果 readChan 中没有消息,则从文件中读取一条消息,写到 Channel 中;
  2. 如果 writeChan 中有消息,则将消息写到文件中,并在 writeResponseChan 写入结果;
  3. 如果 emptyChan 中有消息,则清空整个队列;
  4. 如果 exitChan 中有消息,则退出整个队列;
  5. 如果 队列的读写操作达到了 syncEvery 阈值,则执行磁盘同步 操作;
  6. 定期执行磁盘同步操作;

我们来看一下代码:

func (d *diskQueue) ioLoop() {
    var dataRead []byte // 存储 从文件中读出的一条消息
    var err error       // 存储 所有操作的错误
    var count int64     // 记录 所有读写操作数量
    var r chan []byte   // readChan 的 copy

    // 设置自动磁盘同步的计时器
    syncTicker := time.NewTicker(d.syncTimeout)

    for {
        
        // 当读写操作数和 syncEvery 相等,则将 needSync 置位
        if count == d.syncEvery {
            d.needSync = true
        }

        // 如果需要做磁盘同步,则调用 sync() 并将 count 清零
        if d.needSync {
            err = d.sync()
            if err != nil {
                d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
            }
            count = 0
        }

        // 如果有未消费的数据
        // 当前读的文件编号 小于 当前正在写的文件编号,认为是有未读取消息
        // 当前读取的位置 小于 当前写的位置,认为有未读取的消息
        if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
      
            // nextReadPos == readPos 表示上一次读的消息已经被消费
            // nextReadPos > readPos  表示上一次读的消息没有被消费
            if d.nextReadPos == d.readPos {
                // 从文件中拿一条消息,存到 dataRead
                dataRead, err = d.readOne()
                if err != nil {
                    d.logf(ERROR, "DISKQUEUE(%s) reading at %d of %s - %s",
                        d.name, d.readPos, d.fileName(d.readFileNum), err)
                    d.handleReadError()
                    // 如果有问题,那么从头进入循环
                    continue
                }
            }
            // 将 readChan 拷贝一份到 r
            // 这是因为,当执行到这里,diskQueue 被关闭的话
            // r 是一个 nil 这样消息不会被消费掉
            r = d.readChan
        } else {
              // 如果没有消息,那么将 r 赋值为 nil
            r = nil
        }

        select {
        // 在 select 对 nil 的 Channel 读写的话,都会被跳过
        case r <- dataRead:
            count++  // 操作计数增加
            // 检查是否需要移动到下一个文件
            d.moveForward()
        case d.depthChan <- d.depth:  // 用来返回堆积消息数量的 Channel
        case <-d.emptyChan:           // 获取是否有 emptyChan 的请求
            // 删除所有的文件,并返回清空的结果
            d.emptyResponseChan <- d.deleteAllFiles()
              // 将 count 清零 
            count = 0
        case dataWrite := <-d.writeChan: // 如果有消息写入
            // 操作数 +1
            count++
              // 写入磁盘,并返回写入结果
            d.writeResponseChan <- d.writeOne(dataWrite)
        case <-syncTicker.C: // 计时器到期
            if count == 0 {    // 这一个计时周期没有操作,那么就不触发磁盘同步 
                continue
            }
            d.needSync = true  // 触发同步
        case <-d.exitChan:   // 收到退出信号
            goto exit
        }
    }

exit:
    d.logf(INFO, "DISKQUEUE(%s): closing ... ioLoop", d.name)
    syncTicker.Stop()   // 将计时器停止
  d.exitSyncChan <- 1 // 标记 ioLoop 已经退出 ,exit() 方法会等待此 channel
}

到此,我们基本上将队列的读取和写入流程介绍完了。我们很多变量都已经讲解到了 。但是还剩一些具体的方法没有讲解。我们先将磁盘写入、读取和同步的方法详解一下。

磁盘写入

首先是写一条数据。这里我们要首先说一下消息是怎么存在文件中的。整个文件是以二进制的形式存在,存入消息时,会在消息前面加一个4个字节的整数,表示了一个消息的长度。所以,文件中的每条消息长度为  实际消息长度+4

// 写一条消息
func (d *diskQueue) writeOne(data []byte) error {
    var err error

      // 当写文件句柄为空时,打开新的文件
    if d.writeFile == nil {
        // fileName 返回组合后的文件名
        curFileName := d.fileName(d.writeFileNum)
        d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600)
        if err != nil {
            return err
        }

        d.logf(INFO, "DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName)
        // 如果写的位置是不是文件开始,那么久 seek 到具体的位置
        if d.writePos > 0 {
            _, err = d.writeFile.Seek(d.writePos, 0)
            if err != nil {
                d.writeFile.Close()
                d.writeFile = nil
                return err
            }
        }
    }

      // 获取消息实际长度
    dataLen := int32(len(data))

      // 检查消息长度合法性
    if dataLen < d.minMsgSize || dataLen > d.maxMsgSize {
        return fmt.Errorf("invalid message write size (%d) maxMsgSize=%d", dataLen, d.maxMsgSize)
    }

      // 重置 buffer
    d.writeBuf.Reset()
  
      // 写入 buffer 消息长度
    err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)
    if err != nil {
        return err
    }
 
      // 写入 buffer 实际消息内容
    _, err = d.writeBuf.Write(data)
    if err != nil {
        return err
    }

    // 写入文件
    _, err = d.writeFile.Write(d.writeBuf.Bytes())
    if err != nil {
        d.writeFile.Close()
        d.writeFile = nil
        return err
    }

      // 写入消息的长度
    totalBytes := int64(4 + dataLen)
  
    d.writePos += totalBytes   // 增加 writePos 到下一个写入点
    d.depth += 1   // 消息堆积数 +1
 
    if d.writePos >= d.maxBytesPerFile { // 写入位置大于最大的文件大小设置
        d.writeFileNum++ // 写文件编号 +1
        d.writePos = 0   // 写入位置置 0

        // 打开新文件时,一定要先做磁盘同步
        err = d.sync()
        if err != nil {
            d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
        }

        if d.writeFile != nil { // 将当前文件句柄关掉
            d.writeFile.Close()
            d.writeFile = nil
        }
    }

    return err
}

磁盘读取

磁盘内容的读取,基本上和写入的流程是差不多的。

❗注意,这里读完一条消息之后,没有对 depth 进行减1,是在 moveForward() 中做了这一步。详细讲解请看 “文件轮转” 小节。

func (d *diskQueue) readOne() ([]byte, error) {
    var err error
    var msgSize int32

      // 初始化读取文件句柄
    if d.readFile == nil {
        curFileName := d.fileName(d.readFileNum)
        d.readFile, err = os.OpenFile(curFileName, os.O_RDONLY, 0600)
        if err != nil {
            return nil, err
        }

        d.logf(INFO, "DISKQUEUE(%s): readOne() opened %s", d.name, curFileName)

        // 查找到上次读取的位置
        if d.readPos > 0 {
            _, err = d.readFile.Seek(d.readPos, 0)
            if err != nil {
                d.readFile.Close()
                d.readFile = nil
                return nil, err
            }
        }

        // 初始化 reader
        d.reader = bufio.NewReader(d.readFile)
    }

      // 读 4个字节 -> 消息的长度
    err = binary.Read(d.reader, binary.BigEndian, &msgSize)
    if err != nil {
        d.readFile.Close()
        d.readFile = nil
        return nil, err
    }

      // 检查消息长度合法性
    if msgSize < d.minMsgSize || msgSize > d.maxMsgSize {
        // this file is corrupt and we have no reasonable guarantee on
        // where a new message should begin
        d.readFile.Close()
        d.readFile = nil
        return nil, fmt.Errorf("invalid message read size (%d)", msgSize)
    }

      // 初始化 buffer
    readBuf := make([]byte, msgSize)
      // 将消息读入 buffer
    _, err = io.ReadFull(d.reader, readBuf)
    if err != nil {
        d.readFile.Close()
        d.readFile = nil
        return nil, err
    }

      // 计算消息总体长度
    totalBytes := int64(4 + msgSize)

    // nextReadPos 赋值为下一条消息的位置
    // 如果读出来的这条被消费掉,那么 readPos 会等于 nextReadPos
    // 具体看 ioLoop
    d.nextReadPos = d.readPos + totalBytes
 
     // nextReadFileNum 赋值为当前的文件编号
    d.nextReadFileNum = d.readFileNum

    // 如果下一个读取位置 大于 最大的文件大小配置
      // 则开启下一个文件读
    if d.nextReadPos > d.maxBytesPerFile {
        if d.readFile != nil {
            d.readFile.Close()
            d.readFile = nil
        }

        d.nextReadFileNum++
        d.nextReadPos = 0
    }

    return readBuf, nil // 返回消息
}

磁盘同步

磁盘同步很简单,调用了系统调用,将缓存内容同步。
这里复杂的地方在于,它还进行了 metadata 的磁盘同步工作。metadata 是记录了当前写入位置,当前读取位置等等的一些信息,用方法 persistMetaData 实现。

func (d *diskQueue) sync() error {
    if d.writeFile != nil { // 如果文件句柄不为空
        err := d.writeFile.Sync() // 同步磁盘
        if err != nil {
            d.writeFile.Close()
            d.writeFile = nil
            return err
        }
    }

    err := d.persistMetaData() // 同步 metadata
    if err != nil {
        return err
    }

    d.needSync = false // 因为刚刚同步,所以将 needSync 置为 false
    return nil
}

// 存储 metadata
func (d *diskQueue) persistMetaData() error {
    var f *os.File
    var err error

      // 拿到文件名
    fileName := d.metaDataFileName()
      // 打开一个带 tmp 后缀的文件
      // 这里是为了不让文件写一半程序就退出导致问题
    tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())

    // 打开文件
    f, err = os.OpenFile(tmpFileName, os.O_RDWR|os.O_CREATE, 0600)
    if err != nil {
        return err
    }

      // 堆积数、读文件编号、读位置、写文件编号、写位置
      // 按照一行一个写入文件
    _, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n",
        d.depth,
        d.readFileNum, d.readPos,
        d.writeFileNum, d.writePos)
    if err != nil {
        f.Close()
        return err
    }
    f.Sync()    // 同步磁盘
    f.Close()   // 关闭句柄

      // 将 tmp 文件变更为实际文件
    return os.Rename(tmpFileName, fileName)
}

到此,生产消息,消费消息的部分就已经全部结束了。剩余还有一些文件轮转、退出、清空的动作,我们在下面统一讲解

文件轮转

diskQueue 支持设置单文件存储最大值,如果超过这个值,就会新开一个文件,来继续存储消息。文件的名称会按照基础的文件名,添加一个数字的后缀,这个数字的后缀,就是由 writeFileNum 来控制的,是一个递增的变量。

readOnewriteOne 中,已经有一部分代码在实现这部分功能。当读/写的下一个位置超过了文件大小最大值的设定时,就会自增 writeFileNum ,来达到文件轮转的效果。

但是,已经读完的文件还需要清理掉。这就是 ioLoop 中,调用 moveForward() 的作用。就是当读完一个文件时,用作清理。

func (d *diskQueue) moveForward() {
    oldReadFileNum := d.readFileNum // 假设当前的文件编号是旧的
    d.readFileNum = d.nextReadFileNum  // 将下一个文件编号赋值给 readFileNum
    d.readPos = d.nextReadPos  // 更新要读取的位置
    d.depth -= 1 // 消息已经被消费,堆积数减1

    // 如果旧文件编号不等于当前文件编号,则认为需要清理旧文件
    if oldReadFileNum != d.nextReadFileNum {
        d.needSync = true // 每次使用新文件,都同步磁盘

        fn := d.fileName(oldReadFileNum) // 拿到旧文件名
        err := os.Remove(fn)  // 删除掉
        if err != nil {
            d.logf(ERROR, "DISKQUEUE(%s) failed to Remove(%s) - %s", d.name, fn, err)
        }
    }

    d.checkTailCorruption(d.depth)  // 清理一些异常尾巴
}

func (d *diskQueue) checkTailCorruption(depth int64) {
      // 一切正常
    if d.readFileNum < d.writeFileNum || d.readPos < d.writePos {
        return
    }

    // 已经读到文件队列的尾巴了,但是堆积数还是不为零
      // 那就说明有问题,需要清理异常的数据
    if depth != 0 {
        if depth < 0 {
            d.logf(ERROR,
                "DISKQUEUE(%s) negative depth at tail (%d), metadata corruption, resetting 0...",
                d.name, depth)
        } else if depth > 0 {
            d.logf(ERROR,
                "DISKQUEUE(%s) positive depth at tail (%d), data loss, resetting 0...",
                d.name, depth)
        }
        // force set depth 0
        d.depth = 0
        d.needSync = true
    }

    if d.readFileNum != d.writeFileNum || d.readPos != d.writePos {
        if d.readFileNum > d.writeFileNum {
            d.logf(ERROR,
                "DISKQUEUE(%s) readFileNum > writeFileNum (%d > %d), corruption, skipping to next writeFileNum and resetting 0...",
                d.name, d.readFileNum, d.writeFileNum)
        }

        if d.readPos > d.writePos {
            d.logf(ERROR,
                "DISKQUEUE(%s) readPos > writePos (%d > %d), corruption, skipping to next writeFileNum and resetting 0...",
                d.name, d.readPos, d.writePos)
        }

        d.skipToNextRWFile() // 强制跳到下一个文件
        d.needSync = true
    }
}

// 这里相当于不再使用之前的所有文件,从下一个文件编号重新开始
func (d *diskQueue) skipToNextRWFile() error {
    var err error

    if d.readFile != nil { // 关闭读句柄
        d.readFile.Close()
        d.readFile = nil
    }

    if d.writeFile != nil { // 关闭写句柄
        d.writeFile.Close()
        d.writeFile = nil
    }

      // 将读的文件编号,一直到写的文件编号的所有文件,全部删除
    for i := d.readFileNum; i <= d.writeFileNum; i++ {
        fn := d.fileName(i)
        innerErr := os.Remove(fn)
        if innerErr != nil && !os.IsNotExist(innerErr) {
            d.logf(ERROR, "DISKQUEUE(%s) failed to remove data file - %s", d.name, innerErr)
            err = innerErr
        }
    }

      // 初始化队列的信息
    d.writeFileNum++
    d.writePos = 0
    d.readFileNum = d.writeFileNum
    d.readPos = 0
    d.nextReadFileNum = d.writeFileNum
    d.nextReadPos = 0
    d.depth = 0

    return err
}

清空和退出

主要是调用了我们之前讲解的 exit() 方法,来达到效果。通过传入参数来控制是否要删除所有文件。

func (d *diskQueue) Close() error {
    err := d.exit(false)
    if err != nil {
        return err
    }
    return d.sync()
}

func (d *diskQueue) Delete() error {
    return d.exit(true)
}

尾声

至此,整个文件消息队列的介绍就结束了,这个消息队列配合一个短小的内存队列(可以用带缓存的 channel),就可以做到实现一个相对高效又不会丢非常高风险的内部队列系统。对于日常的开发有着极大的好处。

整个文件消息队列的实现也比较精巧。充分利用了 Golang 中 select 和 channel 的特性,是非常值得日常开发中借鉴的。


署名-非商业性使用-相同方式共享 4.0