0


极狐GitLab 是如何处理每小时 TB 级别的 Git 数据的?

极狐GitLab 是 GitLab 在中国的发行版,也是一个一体化的 DevOps 平台。源代码托管是极狐GitLab的核心功能之一,本文将揭秘极狐GitLab 在处理 TB 级别的 Git 数据时候的一些自身实践。

关于极狐GitLab 的 DevOps 最佳实践可以登录极狐GitLab官网查看,或在文章末尾关注【极狐GitLab】公众号。

极狐GitLab 使用 Golang 抽象的 I/O 功能进行每小时 TB 级别的 Git 流式数据传输。下面来了解如何在 Golang 应用中编写 Reader 和 Writer。

每小时,极狐GitLab 会在服务器和客户端之间传输数 TB 的 Git 数据。除非以流方式有效地完成,否则很难甚至不可能处理如此多的流量。Git 数据由 Gitaly(Git 服务器)、GitLab Shell(通过 SSH 的 Git)和 Workhorse(通过 HTTP(S) 的 Git)提供服务。这些服务是使用 Go 实现的,Go 是一种方便地提供抽象以有效处理 I/O 操作的语言。

Golang 的

io

软件包提供了

Reader

Writer

接口 ,用于将 I/O 实现的功能抽象为公共接口。

Reader

接口是基础方法

Read

的封装:

type Reader interface {
    Read(p []byte) (n int, err error)
}
Writer

接口是基础方法

Write

的封装:

type Writer interface {
    Write(p []byte) (n int, err error)
}

例如,

os

包提供了读取文件的实现。

File

通过定义基本的 Read 和 Write 函数来类型实现

Reader 

Writer

接口。

在这篇博文中,你将学习如何在 Golang 应用程序中编写 Reader 和 Writer。

首先,让我们读取一个文件,并且将其内容写入 os.Stdout。

func main() {
    file, err := os.Open("data.txt")
    if err != nil {
    log.Fatal(err)
    }
    defer file.Close()

    p := make([]byte, 32 * 1024)
    for {
    n, err := file.Read(p)

    _, errW := os.Stdout.Write(p[:n])
    if errW != nil {
    log.Fatal(errW)
    }

    if err != nil {
    if errors.Is(err, io.EOF) {
    break
    }

    log.Fatal(err)
    }
    }
}

每次调用

Read

函数都会用文件中的内容填充缓冲区

p

,即文件会被分块(最多

32KB

)消耗,而不是完全加载到内存中。

为了简化这种广泛使用的模式,

io

包方便地提供了 Copy方法,允许将内容从任何

Reader

传递到任何

Writer

,并处理其他边缘情况的功能。

func main() {
    file, err := os.Open("data.txt")
    if err != nil {
    log.Fatal(err)
    }
    defer file.Close()

    if _, err := io.Copy(os.Stdout, file); err != nil {
    log.Fatal(err)
    }
}
Reader

Writer

接口被用在整个 Golang 接口生态中,因为它们有助于以流方式读取和写入内容。因此,将 Reader 和 Writer 与期望这些接口作为参数的函数粘合在一起是一个经常需要解决的问题。有时,它就像将内容从 Reader 传递到 Writer 一样简单,但有时写入 Writer 的内容必须表示为 Reader,或者来自 Reader 的内容必须发送到多个 Writer。让我们仔细看看不同的用例以及解决极狐GitLab 代码库中这些类型问题的示例。

Reader -> Writer

问题

我们需要将内容从 Reader 传递到 Writer 中。

在这里插入图片描述

解决方案

该问题可以通过使用

io.Copy

来解决。

func Copy(dst Writer, src Reader) (written int64, err error)

示例

InfoRefs*

极狐Gitaly RPC 返回了一个

Reader

,我们希望我们希望通过 HTTP 响应将其内容流式传输给用户:

func handleGetInfoRefsWithGitaly(ctx context.Context, responseWriter *HttpResponseWriter, a *api.Response, rpc, gitProtocol, encoding string) error {
        ...
        infoRefsResponseReader, err := smarthttp.InfoRefsResponseReader(ctx, &a.Repository, rpc, gitConfigOptions(a), gitProtocol)
        ...
        if _, err = io.Copy(w, infoRefsResponseReader); err != nil {
            return err
        }
        ...
}

Reader -> 多个Writer

问题

我们需要将内容从一个 Reader 传递到多个 Writer。

在这里插入图片描述

解决方案

io

软件包提供了

io.MultiWriter

函数,可以讲将多个 Writer 转换为单个 Writer。当调用

Write

函数时,内容将复制到所有 Writer。

func MultiWriter(writers ...Writer) Writer

示例

鉴于我们想从相同的内容构建

md5

sha1

sha256

以及

sha512

哈希值。

Hash

类型就是一个

Writer

。通过使用

io.MultiWriter

,我们定义

multiHash

Writer。将内容写入

multiHash

后,我们在一次运行中计算所有这些函数的哈希值。

该示例的简化版本为:

package main

import (
    "crypto/sha1"
    "crypto/sha256"
    "fmt"
    "io"
    "log"
)

func main() {
    s1 := sha1.New()
    s256 := sha256.New()

    w := io.MultiWriter(s1, s256)
    if _, err := w.Write([]byte("content")); err != nil {
    log.Fatal(err)
    }

    fmt.Println(s1.Sum(nil))
    fmt.Println(s256.Sum(nil))
}

为简单起见,我们只在 Writer 上调用

Write

函数,但当内容来自 Reader 时,

io.Copy

也可以像如下一样使用:

_, err := io.Copy(io.MultiWriter(s1, s256), reader)

多个 Reader -> Reader

问题

我们有多个Reader,需要按顺序阅读。

在这里插入图片描述

解决方案

io

软件包提供了

io.MultiReader

函数,可以将多个 Reader 转换为一个。Reader 以传递的顺序进行读取:

func MultiReader(readers ...Reader) Reader

然后,此 Reader 可以在任何接受

Reader

作为参数的函数中使用。

示例

Workhorse 读取图像的第一个

N

字节以检测它是否是 PNG 文件,并通过从多个Reader构建单个Reader来将它们放回原处:

func NewReader(r io.Reader) (io.Reader, error) {
    magicBytes, err := readMagic(r)
    if err != nil {
    return nil, err
    }

    if string(magicBytes) != pngMagic {
    debug("Not a PNG - read file unchanged")
    return io.MultiReader(bytes.NewReader(magicBytes), r), nil
    }

    return io.MultiReader(bytes.NewReader(magicBytes), &Reader{underlying: r}), nil
}

多个Reader ->多个 Writer

问题

我们需要将内容从多个 Reader 传递到多个 Writer。
在这里插入图片描述

解决方案

上述解决方案可以推广到多对多用例。

_, err := io.Copy(io.MultiWriter(w1, w2, w3), io.MultiReader(r1, r2, r3))

Reader -> Reader + Writer

问题

我们需要从 Reader 读取内容或将 Reader 传递给函数,同时将内容写入 Writer。
在这里插入图片描述

解决方案

io

软件包提供 io.TeeReader函数,该函数的参数为 Reader 和 Writer,并返回可进一步处理的 Reader。

func TeeReader(r Reader, w Writer) Reader

该功能的实现非常简单。传递的参数

Reader

Writer

都存储在结构数据中,该结构就是

Reader

本身:

func TeeReader(r Reader, w Writer) Reader {
    return &teeReader{r, w}
}

type teeReader struct {
    r Reader
    w Writer
}

为该结构实现的

Read

函数将

Read

委托给传递的

Reader

,并对传递的

Writer

执行 Write:

func (t *teeReader) Read(p []byte) (n int, err error) {
    n, err = t.r.Read(p)
    if n > 0 {
    if n, err := t.w.Write(p[:n]); err != nil {
    return n, err
    }
    }
    return
}

示例一

我们已经在

Multiple Writers --> Writer

部分提及了哈希相关话题,而

io.TeeReader

就被用来提供一个能够从内容创建哈希值的 Writer。而返回值 Reader 可以被进一步的用来将内容上传到对象存储中。

示例二

Workhorse 使用

io.TeeReader

来实现依赖代理功能。依赖代理缓存在对象存储中请求上游图片。还未被缓存的用例会有以下行为:

  • 用户执行 HTTP 请求。
  • 上游图像是使用 net/http 获取的,并通过 http.ResponseBody 字段为其提供内容,该字段使用方法 io.ReadClose(以 io.Reader 为基础)。
  • 我们需要将此内容发送回给用户,方法是将其写入 http.ResponseWriter(以 io.Writer为基础)。
  • 我们需要通过执行 http.Request(以 io.Reader为参数的函数) 同时将内容上传到对象存储。

因此,io.TeeReader可用于将这些基元粘合在一起:

func (p *Injector) Inject(w http.ResponseWriter, r *http.Request, sendData string) {
    // Fetch upstream data via HTTP
    dependencyResponse, err := p.fetchUrl(r.Context(), sendData)
    ...
    // Create a tee reader. Each Read will read from dependencyResponse.Body and simultaneously
        // perform a Write to w writer
    teeReader := io.TeeReader(dependencyResponse.Body, w)
    // Pass the tee reader as the body of an HTTP request to upload it to object storage
    saveFileRequest, err := http.NewRequestWithContext(r.Context(), "POST", r.URL.String()+"/upload", teeReader)
    ...
    nrw := &nullResponseWriter{header: make(http.Header)}
    p.uploadHandler.ServeHTTP(nrw, saveFileRequest)
    ...

Writer -> Reader

问题

我们有一个接受 Writer 的函数,我们对该函数将写入 Writer 的内容感兴趣。我们希望拦截内容并将其表示为Reader,并以流式处理方式进一步处理它。

在这里插入图片描述

解决方案

io

软件包提供一个

io.Pipe

函数,用来返回 Reader 和 Writer:

func Pipe() (*PipeReader, *PipeWriter)

Writer 可用于传递给能够接收 Writer 的函数。已写入其中的所有内容都可以通过读取器访问,即创建一个同步内存管道,可用于将需要

io.Reader

的代码与需要

io.Writer

的代码连接起来。

示例一

对于代码导航的 LSIF 文件转换,我们需要:

  • 读取 zip 文件的内容。
  • 转换内容并将其序列化为 zip.Writer
  • 将新的压缩内容表示为读取器,以流式处理方式进行进一步处理。

该 zip.Writer函数接受一个 Writer,它将压缩内容写入该 Writer。当我们需要将打开的文件描述符传递给函数以将内容保存到文件中时,这很方便。但是,当我们需要通过 HTTP 请求传递压缩内容时,我们需要将数据表示为 Reader。

// The `io.Pipe()` creates a reader and a writer.
pr, pw := io.Pipe()

// The writer is passed to `parser.transform` function which will write
// the transformed compressed content into it
// The writing should happen asynchronously in a goroutine because each `Write` to
// the `PipeWriter` blocks until it has satisfied one or more `Read`s from the `PipeReader`.
go parser.transform(pw)

// Everything that has been written into it is now accessible via the reader.
parser := &Parser{
    Docs: docs,
    pr:   pr,
}

// pr is a reader that can be used to read all the data written to the pw writer
return parser, nil

示例二

对于 Geo 设置,极狐GitLab Shell 将所有

git push

操作代理到次节点,并将它们重定向到主节点。

  • 极狐GitLab Shell 建立 SSH 连接并定义 ReadWriter 结构,该结构具有 io.Reader 类型的 In 字段,用来从用户读取数据,还有 io.Writer 类型的 Out 字段,用来将响应发送给用户。
  • 极狐GitLab Shell 执行 HTTP 请求 /info/refs ,并使用 io.Copy 向用户发送 io.Reader 类型的 response.Body
  • 通过向 In 发送数据,用户就能对此响应做出回应,极狐GitLab Shell 需要读取此数据,将其转换为 Git HTTP 期望的请求,并将其作为 HTTP 请求发送到 /git-receive-pack。这就是 io.Pipe 起作用的地方 。
func (c *PushCommand) requestReceivePack(ctx context.Context, client *git.Client) error {
    // Define pipeReader and pipeWriter and use pipeWriter to collect all the data
    //sent by the user converted to a format expected by Git HTTP.
    pipeReader, pipeWriter := io.Pipe()
    // The writing happens asynchronously because it's a blocking operation
    go c.readFromStdin(pipeWriter)

    // pipeReader can be passed as io.Reader and used to read all the data written to pipeWriter
    response, err := client.ReceivePack(ctx, pipeReader)
    ...
    _, err = io.Copy(c.ReadWriter.Out, response.Body)
    ...
}

func (c *PushCommand) readFromStdin(pw *io.PipeWriter) {
    var needsPackData bool

    // Scanner reads the user input line by line
    scanner := pktline.NewScanner(c.ReadWriter.In)
    for scanner.Scan() {
    line := scanner.Bytes()
    // And writes it to the pipe writer
    pw.Write(line)
    ...
    }

    // The data that hasn't been processed by a scanner is copied if necessary
    if needsPackData {
    io.Copy(pw, c.ReadWriter.In)
    }

    // Close the pipe writer to signify EOF for the pipe reader
    pw.Close()
}

Golang 提供了优雅的模式,旨在以流式处理方式高效处理数据。这些模式可用于解决新的挑战或重构与高内存消耗相关的现有性能问题。

标签: gitlab git 极狐GitLab

本文转载自: https://blog.csdn.net/weixin_44749269/article/details/136657506
版权归原作者 极小狐 所有, 如有侵权,请联系我们删除。

“极狐GitLab 是如何处理每小时 TB 级别的 Git 数据的?”的评论:

还没有评论