极狐GitLab 是 GitLab 在中国的发行版,也是一个一体化的 DevOps 平台。源代码托管是极狐GitLab的核心功能之一,本文将揭秘极狐GitLab 在处理 TB 级别的 Git 数据时候的一些自身实践。
关于极狐GitLab 的 DevOps 最佳实践可以登录极狐GitLab官网查看,或在文章末尾关注【极狐GitLab】公众号。
极狐GitLab 使用 Golang 抽象的 I/O 功能进行每小时 TB 级别的 Git 流式数据传输。下面来了解如何在 Golang 应用中编写 Reader 和 Writer。
每小时,极狐GitLab 会在服务器和客户端之间传输数 TB 的 Git 数据。除非以流方式有效地完成,否则很难甚至不可能处理如此多的流量。Git 数据由 Gitaly(Git 服务器)、GitLab Shell(通过 SSH 的 Git)和 Workhorse(通过 HTTP(S) 的 Git)提供服务。这些服务是使用 Go 实现的,Go 是一种方便地提供抽象以有效处理 I/O 操作的语言。
Golang 的
io
软件包提供了
Reader
和
Writer
接口 ,用于将 I/O 实现的功能抽象为公共接口。
Reader
接口是基础方法
Read
的封装:
type Reader interface {
Read(p []byte) (n int, err error)
}
Writer
接口是基础方法
Write
的封装:
type Writer interface {
Write(p []byte) (n int, err error)
}
例如,
os
包提供了读取文件的实现。
File
通过定义基本的 Read 和 Write 函数来类型实现
Reader
和
Writer
接口。
在这篇博文中,你将学习如何在 Golang 应用程序中编写 Reader 和 Writer。
首先,让我们读取一个文件,并且将其内容写入 os.Stdout。
func main() {
file, err := os.Open("data.txt")
if err != nil {
log.Fatal(err)
}
defer file.Close()
p := make([]byte, 32 * 1024)
for {
n, err := file.Read(p)
_, errW := os.Stdout.Write(p[:n])
if errW != nil {
log.Fatal(errW)
}
if err != nil {
if errors.Is(err, io.EOF) {
break
}
log.Fatal(err)
}
}
}
每次调用
Read
函数都会用文件中的内容填充缓冲区
p
,即文件会被分块(最多
32KB
)消耗,而不是完全加载到内存中。
为了简化这种广泛使用的模式,
io
包方便地提供了 Copy方法,允许将内容从任何
Reader
传递到任何
Writer
,并处理其他边缘情况的功能。
func main() {
file, err := os.Open("data.txt")
if err != nil {
log.Fatal(err)
}
defer file.Close()
if _, err := io.Copy(os.Stdout, file); err != nil {
log.Fatal(err)
}
}
Reader
和
Writer
接口被用在整个 Golang 接口生态中,因为它们有助于以流方式读取和写入内容。因此,将 Reader 和 Writer 与期望这些接口作为参数的函数粘合在一起是一个经常需要解决的问题。有时,它就像将内容从 Reader 传递到 Writer 一样简单,但有时写入 Writer 的内容必须表示为 Reader,或者来自 Reader 的内容必须发送到多个 Writer。让我们仔细看看不同的用例以及解决极狐GitLab 代码库中这些类型问题的示例。
Reader -> Writer
问题
我们需要将内容从 Reader 传递到 Writer 中。
解决方案
该问题可以通过使用
io.Copy
来解决。
func Copy(dst Writer, src Reader) (written int64, err error)
示例
InfoRefs*
极狐Gitaly RPC 返回了一个
Reader
,我们希望我们希望通过 HTTP 响应将其内容流式传输给用户:
func handleGetInfoRefsWithGitaly(ctx context.Context, responseWriter *HttpResponseWriter, a *api.Response, rpc, gitProtocol, encoding string) error {
...
infoRefsResponseReader, err := smarthttp.InfoRefsResponseReader(ctx, &a.Repository, rpc, gitConfigOptions(a), gitProtocol)
...
if _, err = io.Copy(w, infoRefsResponseReader); err != nil {
return err
}
...
}
Reader -> 多个Writer
问题
我们需要将内容从一个 Reader 传递到多个 Writer。
解决方案
io
软件包提供了
io.MultiWriter
函数,可以讲将多个 Writer 转换为单个 Writer。当调用
Write
函数时,内容将复制到所有 Writer。
func MultiWriter(writers ...Writer) Writer
示例
鉴于我们想从相同的内容构建
md5
、
sha1
、
sha256
以及
sha512
哈希值。
Hash
类型就是一个
Writer
。通过使用
io.MultiWriter
,我们定义
multiHash
Writer。将内容写入
multiHash
后,我们在一次运行中计算所有这些函数的哈希值。
该示例的简化版本为:
package main
import (
"crypto/sha1"
"crypto/sha256"
"fmt"
"io"
"log"
)
func main() {
s1 := sha1.New()
s256 := sha256.New()
w := io.MultiWriter(s1, s256)
if _, err := w.Write([]byte("content")); err != nil {
log.Fatal(err)
}
fmt.Println(s1.Sum(nil))
fmt.Println(s256.Sum(nil))
}
为简单起见,我们只在 Writer 上调用
Write
函数,但当内容来自 Reader 时,
io.Copy
也可以像如下一样使用:
_, err := io.Copy(io.MultiWriter(s1, s256), reader)
多个 Reader -> Reader
问题
我们有多个Reader,需要按顺序阅读。
解决方案
io
软件包提供了
io.MultiReader
函数,可以将多个 Reader 转换为一个。Reader 以传递的顺序进行读取:
func MultiReader(readers ...Reader) Reader
然后,此 Reader 可以在任何接受
Reader
作为参数的函数中使用。
示例
Workhorse 读取图像的第一个
N
字节以检测它是否是 PNG 文件,并通过从多个Reader构建单个Reader来将它们放回原处:
func NewReader(r io.Reader) (io.Reader, error) {
magicBytes, err := readMagic(r)
if err != nil {
return nil, err
}
if string(magicBytes) != pngMagic {
debug("Not a PNG - read file unchanged")
return io.MultiReader(bytes.NewReader(magicBytes), r), nil
}
return io.MultiReader(bytes.NewReader(magicBytes), &Reader{underlying: r}), nil
}
多个Reader ->多个 Writer
问题
我们需要将内容从多个 Reader 传递到多个 Writer。
解决方案
上述解决方案可以推广到多对多用例。
_, err := io.Copy(io.MultiWriter(w1, w2, w3), io.MultiReader(r1, r2, r3))
Reader -> Reader + Writer
问题
我们需要从 Reader 读取内容或将 Reader 传递给函数,同时将内容写入 Writer。
解决方案
该
io
软件包提供 io.TeeReader函数,该函数的参数为 Reader 和 Writer,并返回可进一步处理的 Reader。
func TeeReader(r Reader, w Writer) Reader
该功能的实现非常简单。传递的参数
Reader
和
Writer
都存储在结构数据中,该结构就是
Reader
本身:
func TeeReader(r Reader, w Writer) Reader {
return &teeReader{r, w}
}
type teeReader struct {
r Reader
w Writer
}
为该结构实现的
Read
函数将
Read
委托给传递的
Reader
,并对传递的
Writer
执行 Write:
func (t *teeReader) Read(p []byte) (n int, err error) {
n, err = t.r.Read(p)
if n > 0 {
if n, err := t.w.Write(p[:n]); err != nil {
return n, err
}
}
return
}
示例一
我们已经在
Multiple Writers --> Writer
部分提及了哈希相关话题,而
io.TeeReader
就被用来提供一个能够从内容创建哈希值的 Writer。而返回值 Reader 可以被进一步的用来将内容上传到对象存储中。
示例二
Workhorse 使用
io.TeeReader
来实现依赖代理功能。依赖代理缓存在对象存储中请求上游图片。还未被缓存的用例会有以下行为:
- 用户执行 HTTP 请求。
- 上游图像是使用
net/http
获取的,并通过http.Response
的Body
字段为其提供内容,该字段使用方法io.ReadClose
(以io.Reader
为基础)。 - 我们需要将此内容发送回给用户,方法是将其写入
http.ResponseWriter
(以io.Writer
为基础)。 - 我们需要通过执行
http.Request
(以io.Reader
为参数的函数) 同时将内容上传到对象存储。
因此,io.TeeReader可用于将这些基元粘合在一起:
func (p *Injector) Inject(w http.ResponseWriter, r *http.Request, sendData string) {
// Fetch upstream data via HTTP
dependencyResponse, err := p.fetchUrl(r.Context(), sendData)
...
// Create a tee reader. Each Read will read from dependencyResponse.Body and simultaneously
// perform a Write to w writer
teeReader := io.TeeReader(dependencyResponse.Body, w)
// Pass the tee reader as the body of an HTTP request to upload it to object storage
saveFileRequest, err := http.NewRequestWithContext(r.Context(), "POST", r.URL.String()+"/upload", teeReader)
...
nrw := &nullResponseWriter{header: make(http.Header)}
p.uploadHandler.ServeHTTP(nrw, saveFileRequest)
...
Writer -> Reader
问题
我们有一个接受 Writer 的函数,我们对该函数将写入 Writer 的内容感兴趣。我们希望拦截内容并将其表示为Reader,并以流式处理方式进一步处理它。
解决方案
io
软件包提供一个
io.Pipe
函数,用来返回 Reader 和 Writer:
func Pipe() (*PipeReader, *PipeWriter)
Writer 可用于传递给能够接收 Writer 的函数。已写入其中的所有内容都可以通过读取器访问,即创建一个同步内存管道,可用于将需要
io.Reader
的代码与需要
io.Writer
的代码连接起来。
示例一
对于代码导航的 LSIF 文件转换,我们需要:
- 读取 zip 文件的内容。
- 转换内容并将其序列化为
zip.Writer
。 - 将新的压缩内容表示为读取器,以流式处理方式进行进一步处理。
该 zip.Writer函数接受一个 Writer,它将压缩内容写入该 Writer。当我们需要将打开的文件描述符传递给函数以将内容保存到文件中时,这很方便。但是,当我们需要通过 HTTP 请求传递压缩内容时,我们需要将数据表示为 Reader。
// The `io.Pipe()` creates a reader and a writer.
pr, pw := io.Pipe()
// The writer is passed to `parser.transform` function which will write
// the transformed compressed content into it
// The writing should happen asynchronously in a goroutine because each `Write` to
// the `PipeWriter` blocks until it has satisfied one or more `Read`s from the `PipeReader`.
go parser.transform(pw)
// Everything that has been written into it is now accessible via the reader.
parser := &Parser{
Docs: docs,
pr: pr,
}
// pr is a reader that can be used to read all the data written to the pw writer
return parser, nil
示例二
对于 Geo 设置,极狐GitLab Shell 将所有
git push
操作代理到次节点,并将它们重定向到主节点。
- 极狐GitLab Shell 建立 SSH 连接并定义
ReadWriter
结构,该结构具有io.Reader
类型的In
字段,用来从用户读取数据,还有io.Writer
类型的Out
字段,用来将响应发送给用户。 - 极狐GitLab Shell 执行 HTTP 请求
/info/refs
,并使用io.Copy
向用户发送io.Reader
类型的response.Body
。 - 通过向
In
发送数据,用户就能对此响应做出回应,极狐GitLab Shell 需要读取此数据,将其转换为 Git HTTP 期望的请求,并将其作为 HTTP 请求发送到/git-receive-pack
。这就是io.Pipe
起作用的地方 。
func (c *PushCommand) requestReceivePack(ctx context.Context, client *git.Client) error {
// Define pipeReader and pipeWriter and use pipeWriter to collect all the data
//sent by the user converted to a format expected by Git HTTP.
pipeReader, pipeWriter := io.Pipe()
// The writing happens asynchronously because it's a blocking operation
go c.readFromStdin(pipeWriter)
// pipeReader can be passed as io.Reader and used to read all the data written to pipeWriter
response, err := client.ReceivePack(ctx, pipeReader)
...
_, err = io.Copy(c.ReadWriter.Out, response.Body)
...
}
func (c *PushCommand) readFromStdin(pw *io.PipeWriter) {
var needsPackData bool
// Scanner reads the user input line by line
scanner := pktline.NewScanner(c.ReadWriter.In)
for scanner.Scan() {
line := scanner.Bytes()
// And writes it to the pipe writer
pw.Write(line)
...
}
// The data that hasn't been processed by a scanner is copied if necessary
if needsPackData {
io.Copy(pw, c.ReadWriter.In)
}
// Close the pipe writer to signify EOF for the pipe reader
pw.Close()
}
Golang 提供了优雅的模式,旨在以流式处理方式高效处理数据。这些模式可用于解决新的挑战或重构与高内存消耗相关的现有性能问题。
版权归原作者 极小狐 所有, 如有侵权,请联系我们删除。