卓越飞翔博客卓越飞翔博客

卓越飞翔 - 您值得收藏的技术分享站
技术文章71747本站已运行4228

解决 Go 中的十亿行挑战(从 到 s)

解决 go 中的十亿行挑战(从 到 s)

不久前,一位朋友告诉我一个挑战,涉及读取 10 亿行的文件。我发现这个想法很有趣,但由于当时是大学考试周,所以我最终把它留到以后再看。几个月后,我看到了 theo 拍摄的有关挑战的视频,并决定仔细观察。

十亿行挑战赛的目标是计算一系列城市的最低、最高和平均温度 - 具体来说,这个列表中有 10 亿个项目,其中每个项目由城市名称和温度,每个城市都可以出现多次。最后,程序必须按城市名称的字母顺序显示这些值。

我认为尝试解决挑战会很有趣,即使没有奖励。不管怎样,我写了这篇文字来描述我的过程。

第一次尝试:使代码工作

每当我需要解决更复杂的问题时,我的首要目标就是让程序运行。它可能不是最快的代码或最干净的代码,但它是有效的代码。

基本上,我创建了位置结构来表示列表中的每个城市,其中包含最低和最高温度、温度总和以及城市在列表中出现的次数(最后两个是计算平均值所必需的)。我知道有一种方法可以直接计算平均值,而无需存储温度数及其总和。但正如我之前提到的,目标是让代码正常工作。

数据列表由城市名称后跟温度组成,中间用分号分隔。例如:

antananarivo;15.6
iqaluit;-20.7
dolisie;25.8
kuopio;-6.8

读取数据最简单的方法是使用 scan,它允许您一次读取一行。通过该行,您可以将其分为两部分:分号之前和之后的值。要获取温度,您可以使用 strconv.parsefloat,它将字符串转换为浮点数。第一次实现的完整代码如下:

package main

import (
    "bufio"
    "fmt"
    "math"
    "os"
    "sort"
    "strconv"
    "strings"
)

type location struct {
    min   float64
    max   float64
    sum   float64
    count int
}

func newlocation() *location {
    return &location{
        min:   math.maxint16,
        max:   math.minint16,
        sum:   0,
        count: 0,
    }
}

func (loc *location) add(temp float64) {
    if temp  loc.max {
        loc.max = temp
    }

    loc.sum += temp
    loc.count += 1
}

var cpuprofile = flag.string("cpuprofile", "", "write cpu profile to file")

func main() {
    flag.parse()
    if *cpuprofile != "" {
        f, err := os.create(*cpuprofile)
        if err != nil {
            log.fatal(err)
        }
        pprof.startcpuprofile(f)
        defer pprof.stopcpuprofile()
    }

    file, _ := os.open("./measurements.txt")
    defer file.close()

    m := map[string]*location{}

    scanner := bufio.newscanner(file)
    for scanner.scan() {
        line := scanner.text()
        name, tempstr, _ := strings.cut(line, ";")
        temp, _ := strconv.parsefloat(tempstr, 32)

        loc, ok := m[name]
        if !ok {
            loc = newlocation()
            m[name] = loc
        }
        loc.add(temp)
    }

    keys := make([]string, 0, len(m))
    for k := range m {
        keys = append(keys, k)
    }
    sort.strings(keys)

    for _, name := range keys {
        loc := m[name]
        mean := loc.sum / float64(loc.count)
        fmt.printf("%s: %.1f/%.1f/%.1fn", name, loc.min, mean, loc.max)
    }
}

这个更简单的版本运行大约需要 97 秒。

优化字符串到浮点数的转换

分析执行情况,我意识到最大的瓶颈之一是 strconv.parsefloat 函数。基本上,它的总执行时间为 23 秒(约占总时间的 23%)。

这个函数的问题是它是通用的,也就是说,它可以与任何有效的浮点数一起使用。然而,我们的数据具有非常特定的温度格式。请参阅下面的示例:

antananarivo;15.6
iqaluit;-20.7
dolisie;5.8
kuopio;-6.8

温度格式始终相同:点前一位或两位数字,点后一位数字,并且可能在开头包含一个减号。因此,我们可以创建一个以特定方式转换值的函数,优化流程,而无需执行 parsefloat 的所有通用检查。

func bytestotemp(b []byte) float64 {
    var v int16
    var isneg int16 = 1

    for i := 0; i 



<p>为了以字节格式而不是字符串读取数据,我将扫描器的返回从字符串更改为字节<br></p>

<pre class="brush:php;toolbar:false">line := scanner.bytes()
before, after, _ := bytes.cut(line, []byte{';'})
name := string(before)
temp := bytestotemp(after)

这些小改动将运行时间缩短至 75 秒。

读取更大的数据块

使用scan的最大优点是程序不需要一次性将整个文件加载到内存中。相反,它让您逐行读取,以性能换取内存。

需要注意的是,一次读取一行和一次加载 14 gb 数据之间存在折衷。这个中间立场是读取块,即内存片段。这样,我们就可以读取 128 mb 的块,而不是一次读取整个文件。

buf := make([]byte, chunksize)
reader := bufio.newreader(file)
var leftdata []byte
for {
    n, err := reader.read(buf)
    if err != nil {
        if err == io.eof {
            break
        }
        panic(err)
    }

    chunk := append(leftdata, buf[:n]...)
    lastindex := bytes.lastindex(chunk, []byte{'n'})
    leftdata = chunk[lastindex+1:]

    lines := bytes.split(chunk[:lastindex], []byte{'n'})

    for _, line := range lines {
        before, after, _ := bytes.cut(line, []byte{';'})
        name := string(before)
        temp := bytestotemp(after)

        loc, ok := m[name]
        if !ok {
            loc = newlocation()
            m[name] = loc
        }
        loc.add(temp)
    }
}

结果,执行时间下降到了 70 秒。比以前好多了,但还有进步的空间

更改数据类型

事实上,整个挑战都围绕着带小数位的数字。然而,处理浮点始终是一个巨大的挑战(参见 ieee-754)。既然如此,为什么不用整数来表示温度呢?

type location struct {
    min   int16
    max   int16
    sum   int32
    count int32
}

如前所述,温度始终由最多三位数字表示。因此,除去逗号,值可以在-999和999之间变化,因此int16足以表示它们。对于求和和计数,int32 绰绰有余,因为该类型的范围可以在 -2147483648 到 2147483647 之间。

鉴于我们现在期望温度为 16 位整数值,我们需要修改 bytestotemp 函数。为此,我们将返回更改为 int16 并删除末尾的除法。因此,该函数将始终返回一个整数。

func bytestotemp(b []byte) int16 {
    var v int16
    var isneg int16 = 1

    for i := 0; i 



<p>最后,我修改了 add 函数以接受整数值,并调整打印以将值分开,然后再将它们显示在屏幕上。结果,时间下降了三秒,达到了六十秒。虽然不多,但胜利就是胜利。</p>

<h2>
  
  
  提高字节到字符串的转换性能
</h2>

<p>再次分析配置文件,我发现有一个名为 slicebytetostring 的函数,执行时间为 13.5 秒。分析后,我发现这个函数负责将一组字节转换为字符串(函数的名称清楚地表明了这一点)。在本例中,这是使用 string(bytes) 函数时内部调用的函数。</p>

<p>在 go 中,与大多数语言一样,字符串是不可变的,这意味着它们在创建后就无法修改(通常,当您执行此操作时,会创建一个新字符串)。另一方面,列表是可变的。换句话说,当将字节列表转换为字符串时,需要创建列表的副本,以确保列表更改时字符串不会更改。</p>

<p>为了避免这些转换中内存分配的额外成本,我们可以使用不安全库来执行字节到字符串的转换(注意:它被称为不安全是有原因的)。<br></p>

<pre class="brush:php;toolbar:false">name := unsafe.string(unsafe.slicedata(before), len(before))

与之前的情况不同,上面的函数重用传递的字节来生成字符串。这样做的问题是,如果原始列表发生变化,结果字符串也会受到影响。虽然我们可以保证在这个特定的上下文中不会发生这种情况,但在更大、更复杂的应用程序中,使用 unsafe 可能会变得非常不安全。

通过此更改,我们将执行时间减少到 51 秒。还不错。

重新实现 bytes.cut

还记得我提到过温度总是有特定的形状吗?因此,根据将线路分为两部分(城市名称和温度)的执行配置文件,运行时间为 5.38 秒。我们要手工重新制作吗?

为了分隔这两个值,我们需要找到“;”在哪里。我们已经知道,温度值可以有三到五个字符。因此,我们需要检查数字前面的字符是否是“;”。很简单吧?

idx := 0
if line[len(line)-4] == ';' {
    idx = len(line) - 4
} else if line[len(line)-5] == ';' {
    idx = len(line) - 5
} else {
    idx = len(line) - 6
}

before := line[:idx]
after := line[idx+1:]

这样,执行时间就变成了 46 秒,比之前少了大约 5 秒(谁知道呢,对吧?)。

并行性以加快处理速度

一直以来,我们的目标都是使代码在一个核心上尽可能快地运行。通过一些地方的改变,我们将时间从 97 秒减少到 46 秒。当然,您仍然可以缩短时间而无需处理并行性,但生命太短,不必担心,对吧?

为了能够在多个核心上运行代码,我决定使用 go 的原生通道结构。此外,我还创建了一个等待组,用于指示数据处理何时完成。

值得强调的是,在本例中,workers 是一个常量,定义将创建多少个 goroutine 来处理数据。就我而言,有 12 个,因为我有一个 6 核和 12 线程的处理器。
chunkchan := make(chan []byte, workers)
var wg sync.waitgroup
wg.add(workers)

下一步是创建负责从通道接收数据并处理数据的 goroutine。数据处理逻辑与单线程模型类似

for i := 0; i 



<p>最后是负责从磁盘读取数据并发送给通道的代码:<br></p>

<pre class="brush:php;toolbar:false">for {
    n, err := reader.read(buf)
    if err != nil {
        if err == io.eof {
            break
        }
        panic(err)
    }

    chunk := append(leftdata, buf[:n]...)
    lastindex := bytes.lastindex(chunk, []byte{'n'})
    leftdata = chunk[lastindex+1:]
    chunkchan 



<p>值得一提的是,go 中的映射不是线程安全的。这意味着同时访问或更改同一映射中的数据可能会导致一致性问题或错误。虽然我在测试过程中没有发现任何问题,但这个问题值得解决。 </p>

<p>解决这个问题的方法之一是为map创建一个锁定机制,一次只允许一个goroutine能够使用它。当然,这可能会使执行速度变慢一些。</p>

<p>第二种选择是为每个 goroutine 创建一个映射,这样它们之间就不会有竞争。最后,将贴图放置在新通道中,并根据它们计算主贴图值。这个解决方案仍然会有成本,但会比之前的方案低<br></p>

<pre class="brush:php;toolbar:false">close(chunkchan)

go func() {
    wg.wait()
    close(mapchan)
}()

keys := make([]string, 0, 825)

m := map[string]*location{}
for lm := range mapchan {
    for lk, lloc := range lm {
        loc, ok := m[lk]
        if !ok {
            keys = append(keys, lk)
            m[lk] = lloc
            continue
        }

        if lloc.min  loc.max {
            loc.max = lloc.max
        }
        loc.sum += lloc.sum
        loc.count += lloc.count
    }
}

此外,由于处理现在分布在不同的核心之间,块大小从 128 mb 减少到 2 mb。我通过测试各种值得出了这个数字,1 mb 到 5 mb 之间是最佳结果。平均而言,2 mb 实现了最佳性能。

无论如何,我们的处理时间从 46 秒下降到……12 秒。

优化块中的换行

每次分析配置文件时,bytes.split 函数都会引起我的注意。它的执行时间为 16 秒(考虑到所有 goroutine 的总时间),考虑到它负责将块分成行,这看起来很公平。然而,这似乎是双重工作,因为她首先打破台词,然后逐行阅读。为什么不同时做呢?

我的方法是循环遍历块并检查当前字节是否对应于 n。这样,我就可以在破坏所有线条的同时浏览它们,然后再处理它们。

start := 0
start := 0
for end, b := range chunk {
    if b != 'n' {
        continue
    }
    before, after := parseline(chunk[start:end])
    // ...
    start = end + 1
}

通过这个简单的更改,执行时间下降到大约 9 秒。

Executed in    8.45 secs    fish           external
   usr time   58.47 secs  152.00 micros   58.47 secs
   sys time    4.52 secs  136.00 micros    4.52 secs

下一步

目前应用最大的瓶颈是地图。加上所有的读写操作,需要32秒(迄今为止最长的时间)。也许创建一个更有效的哈希算法可以解决问题?它仍然是未来的一个想法。

此外,我们在不使用任何外部库的情况下成功地将时间从 1 分 40 秒减少到近 8 秒。此外,尝试让应用程序变得越来越快让我学到了很多东西。

卓越飞翔博客
上一篇: C++框架中的事件和消息处理机制
下一篇: 返回列表
留言与评论(共有 0 条评论)
   
验证码:
隐藏边栏