大家好,我是yangyang.既上一次分享一篇国外PHP技术大佬性能挑战笔记《PHP处理10亿行数据》后,小编今天又看到了国外golang大佬分享的go语言处理10亿行数据挑战的笔记,因此,分享给大家.
十亿行挑战(1BRC)非常简单:任务是开发一个能够读取 10 亿行文件的程序,聚合每行中包含的信息,并打印包含结果的报告。文件中的每一行都包含格式为 的气象站名称和温度读数<station name>;<temperature>,其中气象站名称可以包含空格和除 之外的其他特殊字符,温度是范围从到 的;浮点数,精度限于一位小数。预期的输出格式为,按站点名称的字母顺序排序,其中 min、mean 和 max 表示每个站点计算的最小、平均和最大温度读数。-99.999.9{<station name>=<min>/<mean/<max>, ...}
测量文件示例:
Yellowknife;16.0Entebbe;32.9Porto;24.4Vilnius;12.4Fresno;7.9Maun;17.5Panama City;39.5...
预期输出示例:
{Abha=-23.0/18.0/59.2, Abidjan=-16.2/26.0/67.3, Abéché=-10.0/29.4/69.0, ...}
鉴于 10 亿行文件大约为 13GB,官方存储库没有提供固定数据库,而是提供了一个脚本来生成具有随机读数的合成数据。只需按照说明创建您自己的数据库即可。
尽管该挑战主要针对 Java 开发人员,但该问题预设了一个有趣的玩具练习,可以用任何语言进行实验。由于我每天都在Gamers Club中使用 Golang ,所以我决定尝试一下,看看我能走多深。但在继续这篇文章之前,我想承认,尽管我很精通 Golang,但我并不是 Golang 方面的专家,而且我对低级优化有点愚蠢——我对这个领域从来没有很感兴趣。
在本文中,我将介绍我为优化解决方案而采取的所有步骤。所有内容均在 12 核 24 线程的 Ryzen 9 7900X PC(未超频,因此 4.7HGz)、华擎 B650M-HDC/M.2 主板、2x16GB 6000mhz DDR5 Kingston Fury Beast RAM(也未超频且无EXPO 已启用)和金士顿 SSD SV300S37A/120G。采用 Go 1.22.0 AMD64 的 Windows 11。
我提供的部分结果是在编辑器和浏览器打开时从运行中获得的最低一致值。最终结果通过 55 次执行的汇总结果来呈现。
在决定认真应对这一挑战之前,我很好奇读取和处理这个可怕的 10 亿行文件有多慢。我有一种感觉,如果采用天真的方法来解决这个问题需要很长时间。在这种好奇心的驱使下,我想尝试一下并实现尽可能简单的解决方案:
type StationData struct { Name string Min float64 Max float64 Sum float64 Count int}func run() { data := make(map[string]*StationData) file, err := os.Open("measurements.txt") if err != nil { panic(err) } defer file.Close() scanner := bufio.NewScanner(file) for scanner.Scan() { line := scanner.Text() parts := strings.Split(line, ";") name := parts[0] tempStr := strings.Trim(parts[1], "\n") temperature, err := strconv.ParseFloat(tempStr, 64) if err != nil { panic(err) } station, ok := data[name] if !ok { data[name] = &StationData{name, temperature, temperature, temperature, 1} } else { if temperature < station.Min { station.Min = temperature } if temperature > station.Max { station.Max = temperature } station.Sum += temperature station.Count++ } } printResult(data)}func printResult(data map[string]*StationData) { result := make(map[string]*StationData, len(data)) keys := make([]string, 0, len(data)) for _, v := range data { keys = append(keys, v.Name) result[v.Name] = v } sort.Strings(keys) print("{") for _, k := range keys { v := result[k] fmt.Printf("%s=%.1f/%.1f/%.1f, ", k, v.Min, v.Sum/float64(v.Count), v.Max) } print("}\n")}func main() { started := time.Now() run() fmt.Printf("%0.6f", time.Since(started).Seconds())}
令我惊讶的是,上面的代码运行了约 95 秒,比我预期的要好得多。
请注意,本文的其余部分将使用printResult和main函数,几乎没有任何变化。
我心满意足地上床睡觉,但怎么也睡不着。我知道处理数据需要多少时间,但我不停地问,在没有处理文件开销的情况下,打开和读取文件的最快时间是多少。
func run() { file, err := os.Open("measurements.txt") if err != nil { panic(err) } defer file.Close() scanner := bufio.NewScanner(file) for scanner.Scan() { scanner.Bytes() } if err := scanner.Err(); err != nil { panic(err) }}
第一次尝试。请注意,我使用的是Bytes()而不是String(),快速研究告诉我String()转换速度较慢并且涉及内存分配。该Bytes()函数重用内部缓冲区,返回相同的对象,因此没有额外的分配。结果是惊人的36秒。
实验 | 时间(以秒为单位) |
基本扫描仪 | 36.114680 |
扫描仪默认配置对于这项任务来说确实很糟糕。我已经知道可以读取得更快,因为Java 条目的结果时间可以快至 1.5 秒。但36s秒却出奇的慢。
Scanner 类有一个Buffer函数,它接受预定义的[]byte对象和最大数量的元素,以应对缓冲区大小增大的情况。由于没有太多关于它内部工作原理的细节,我尝试使用它并测试了一些不同的值BUFFER_SIZE:
scanner.Buffer(make([]byte, BUFFER_SIZE), BUFFER_SIZE)
缓冲区大小 | 时间(以秒为单位) |
1024 | 27.4543000 |
64*64 | 12.6580670 |
128*128 | 8.2336520 |
256*256 | 7.0902360 |
512*512 | 6.9288370 |
1024*1024 | 6.7667700 |
2048*2048 | 6.7406200 |
4096*4096 | 6.7061090 |
8192*8192 | 7.0757130 |
好多了!因此,使用大约 2048x2048 和 4096x4096 字节(分别为~4MB 和~16MB)的缓冲区可以提高 80%,达到大约6.7 秒。
实验 | 时间(以秒为单位) |
基本扫描仪 | 36.114680 |
扫描缓冲器 | 6.706109 |
我可以做的另一个快速测试是使用该bufio.Reader对象,它逐字节读取:
reader := bufio.NewReader(file)for { _, err := reader.ReadByte() if err == io.EOF { break } if err != nil { panic(err) }}
这一更改实际上将时间增加到了25.508648 秒。
实验 | 时间(以秒为单位) |
Basic Scanner | 36.114680 |
Scanner Buffer | 6.706109 |
Bufio Reader | 25.508648 |
相反reader.ReadByte(),我尝试了reader.ReadLine,这将时间减少到12.632035 秒,但仍然比扫描仪慢。
实验 | 时间(以秒为单位) |
Basic Scanner | 36.114680 |
Scanner Buffer | 6.706109 |
Bufio Reader | 25.508648 |
Bufio Reader (by line) | 12.632035 |
经过初步探索后,我检查了Scanner.Scan 内部的工作原理,发现它做了很多我不需要的事情。它对缓冲区对象进行了很多操作,但不知道为什么。我还发现它使用了file.Read我以前从未使用过的。我们来尝试一下:
buffer := make([]byte, 1024)for { _, err := file.Read(buffer) if err == io.EOF { break } if err != nil { panic(err) }}
结果是18.867314 秒。
实验 | 时间(以秒为单位) |
Basic Scanner | 36.114680 |
Scanner Buffer | 6.706109 |
Bufio Reader | 25.508648 |
Bufio Reader (by line) | 12.632035 |
File Read | 18.867314 |
请注意,它File.Read接受一个缓冲区。当我们在对象中配置缓冲区时Scanner,扫描仪在内部使用该缓冲区从文件中读取。所以我再次测试了不同的缓冲区大小:
缓冲区大小 | 时间(以秒为单位) |
128*128 | 2.612572 |
256*256 | 1.394397 |
512*512 | 1.189664 |
1024*1024 | 1.008404 |
2048*2048 | 0.984717 |
4096*4096 | 1.045845 |
8192*8192 | 1.321442 |
伟大的!现在这更有意义了。使用 2048x2048 (~4MB) 的0.984717 秒似乎是缓冲区读取的绝佳选择。
实验 | 时间(以秒为单位) |
Basic Scanner | 36.114680 |
Scanner Buffer | 6.706109 |
Bufio Reader | 25.508648 |
Bufio Reader (by line) | 12.632035 |
文件读取(File Read) | 18.867314 |
文件读缓冲区(File Read Buffer) | 0.984717 |
我缺乏知识来解释为什么大缓冲区file.Read比其他版本好得多,但我相信这可能与如何从 SSD 检索信息有关。
为了完成最小结构,我想与多个 goroutine 进行通信,以了解可能会增加多少开销。我的想法是创建一个单一的 goroutine 并使用通道将缓冲区直接发送到其中,这样我就可以单独测量通信的成本。
func consumer(channel chan []byte) { for { <-channel }}func run() { channel := make(chan []byte, 10) go consumer(channel) file, err := os.Open("measurements.txt") if err != nil { fmt.Println(err) return } defer file.Close() buffer := make([]byte, BUFFER_SIZE) for { _, err := file.Read(buffer) if err == io.EOF { break } if err != nil { panic(err) } channel <- buffer }}
这将时间增加到1.266833 秒。
实验 | 时间(以秒为单位) |
Basic Scanner | 36.114680 |
Scanner Buffer | 6.706109 |
Bufio Reader | 25.508648 |
Bufio Reader (by line) | 12.632035 |
文件读取(File Read) | 18.867314 |
文件读缓冲区(File Read Buffer) | 0.984717 |
--- | |
单线程 | 1.266833 |
我仍然遇到一些问题。第一个是File.Read(buffer)每次读取都会覆盖缓冲区,因此,如果我们直接将缓冲区发送到通道,除其他同步问题外,消费者将读取不一致的数据。一旦我们添加更多的 goroutine,这个问题就会变得更严重。
为了避免这种情况,我将缓冲区数据复制到另一个数组中:
data := make([]byte, n)copy(data, buffer[:n])channel <- data
将时间增加到大约2.306197 秒,几乎增加了一倍。请注意,我尝试使用 手动创建切片并复制数据for _, b in := range buffer,但没有任何改进。
实验 | 时间(以秒为单位) |
Basic Scanner | 36.114680 |
Scanner Buffer | 6.706109 |
Bufio Reader | 25.508648 |
Bufio Reader (by line) | 12.632035 |
文件读取(File Read) | 18.867314 |
文件读缓冲区(File Read Buffer) | 0.984717 |
--- | |
单线程 | 1.266833 |
复制缓冲区(Copying Buffer) | 2.306197 |
扩展我们的解决方案的一种自然方法是将每个数据块发送到并行运行的不同 goroutine。 Goroutines 独立聚合数据,完成后,主线程应该合并信息。我相信这个过程类似于一些 NoSQL 数据库优化查询的方式。
然而,此时,主线程从文件中读取固定的缓冲区量,但行可以有不同的长度,这意味着缓冲区将剪切最后一行,除非我们真的很幸运。
我添加了一个“Leftover Logic”来存储一次读取中不完整的最后一行,以用作下一个块的第一部分。
readBuffer := make([]byte, BUFFER_SIZE)leftoverBuffer := make([]byte, 1024)leftoverSize := 0for { n, err := file.Read(readBuffer) if err == io.EOF { break } if err != nil { panic(err) } // Find the last '\n' (byte=10) m := 0 for i := n - 1; i >= 0; i-- { if readBuffer[i] == 10 { m = i break } } data := make([]byte, m+leftoverSize) copy(data, leftoverBuffer[:leftoverSize]) copy(data[leftoverSize:], readBuffer[:m]) copy(leftoverBuffer, readBuffer[m+1:n]) leftoverSize = n - m - 1 channel <- data}
结果为2.359820 秒。
实验 | 时间(以秒为单位) |
Basic Scanner | 36.114680 |
Scanner Buffer | 6.706109 |
Bufio Reader | 25.508648 |
Bufio Reader (by line) | 12.632035 |
文件读取(File Read) | 18.867314 |
文件读缓冲区(File Read Buffer) | 0.984717 |
--- | |
单线程 | 1.266833 |
复制缓冲区(Copying Buffer) | 2.306197 |
Leftover Logic | 2.359820 |
如前所述,这里的自然演变是创建一个工作流程,其中 goroutine 处理数据块并返回部分聚合,主线程合并并呈现结果。
我的想法是创建一个 goroutine 列表,并按顺序向每个 goroutine 发送数据,循环 goroutine 直到文件末尾。此修改没有显着增加时间。此时,我只是复制了旧的处理,对消费者进行了一些更改:
func consumer(input chan []byte, output chan map[string]*StationData, wg *sync.WaitGroup) { defer wg.Done() data := make(map[string]*StationData) separator := []byte{';'} for reading := range input { scanner := bufio.NewScanner(bytes.NewReader(reading)) for scanner.Scan() { // Processing using bytes instead of string line := scanner.Bytes() parts := bytes.Split(line, separator) if len(parts) != 2 { fmt.Println("Invalid line: ", string(line)) continue } name := string(parts[0]) temperature, err := strconv.ParseFloat(string(parts[1]), 64) if err != nil { fmt.Println(err) return } station, ok := data[name] if !ok { data[name] = &StationData{name, temperature, temperature, temperature, 1} } else { if temperature < station.Min { station.Min = temperature } if temperature > station.Max { station.Max = temperature } station.Sum += temperature station.Count++ } } } output <- data}func run() { inputChannels := make([]chan []byte, N_WORKERS) outputChannels := make([]chan map[string]*StationData, N_WORKERS) var wg sync.WaitGroup wg.Add(N_WORKERS) // Create workers for i := 0; i < N_WORKERS; i++ { input := make(chan []byte, CHANNEL_BUFFER) output := make(chan map[string]*StationData, 1) go consumer(input, output, &wg) inputChannels[i] = input outputChannels[i] = output } file, err := os.Open("measurements.txt") if err != nil { panic(err) } defer file.Close() readBuffer := make([]byte, READ_BUFFER_SIZE) leftoverBuffer := make([]byte, 1024) leftoverSize := 0 currentWorker := 0 for { n, err := file.Read(readBuffer) if err == io.EOF { break } if err != nil { panic(err) } m := 0 for i := n - 1; i >= 0; i-- { if readBuffer[i] == 10 { m = i break } } data := make([]byte, m+leftoverSize) copy(data, leftoverBuffer[:leftoverSize]) copy(data[leftoverSize:], readBuffer[:m]) copy(leftoverBuffer, readBuffer[m+1:n]) leftoverSize = n - m - 1 inputChannels[currentWorker] <- data currentWorker++ if currentWorker >= N_WORKERS { currentWorker = 0 } } // Closes the input channels, making the workers to leave their processing loop for i := 0; i < N_WORKERS; i++ { close(inputChannels[i]) } // Wait for all workers to finish processing wg.Wait() for i := 0; i < N_WORKERS; i++ { close(outputChannels[i]) } // Aggregates the results data := make(map[string]*StationData) for i := 0; i < N_WORKERS; i++ { for station, stationData := range <-outputChannels[i] { if _, ok := data[station]; !ok { data[station] = stationData } else { if stationData.Min < data[station].Min { data[station].Min = stationData.Min } if stationData.Max > data[station].Max { data[station].Max = stationData.Max } data[station].Sum += stationData.Sum data[station].Count += stationData.Count } } }}
现在我们需要调整 2 个新参数:工作线程数量 ( N_WORKERS) 和通道缓冲区大小 ( CHANNEL_BUFFER)。为了发现这些参数的影响,我创建了一个针对每对配置运行的网格测试,您可以在下表中看到结果。
Workers\ | 1 | 10 | 15 | 25 | 50 | 100 |
5 | 21.33 | 20.74 | 20.41 | 20.29 | 19.51 | 20.13 |
10 | 13.36 | 11.25 | 11.87 | 11.08 | 11.86 | 11.93 |
15 | 11.25 | 9.6 | 9.63 | 8.9 | 9.13 | 12.27 |
25 | 11.59 | 8.25 | 8.35 | 8.31 | 8.33 | 8.27 |
50 | 9.88 | 8.51 | 8.41 | 8.42 | 8.28 | 8.01 |
100 | 9.38 | 8.54 | 8.30 | 8.54 | 8.29 | 8.56 |
正如预期的那样,通道中具有单个消息缓冲区的几个 Goroutine 会锁定主线程,等待通道变得可用。在缓冲区大小为 10 后,超过 25 个工作线程没有显著增益。为了实现平衡设置,我将继续使用 25 个工作线程和 25 的缓冲区大小。
从基本实现开始,我将展示我如何识别并优化各个代码路径。如果您想重复该过程,只需在程序开头添加以下代码片段即可:
f, err := os.Create("cpu_profile.prof")if err != nil { panic(err)}defer f.Close()if err := pprof.StartCPUProfile(f); err != nil { panic(err)}defer pprof.StopCPUProfile()
然后运行go tool pprof -http 127.0.0.1:8080 cpu_profile.prof,这将打开一个详细的站点,显示 CPU 分析见解。下图是该网站提供的报告之一,称为火焰图。在这里,我们可以看到我的代码中最严重的违规者:bytes.Split、strconv.ParseFloat、slicebytetostring和mapaccess2_faststr:
优化 | 时间(以秒为单位) |
单线程 | 95.000000 |
Goroutine | 8.327305 |
让我们从bytes.Split()函数开始,我用它来分割每行的名称和温度读数。我们可以在火焰图中看到,大部分时间消耗归因于内存分配(runtime.makeslice和runtime.mallocgc)。最简单的解决方案是为名称和温度保留固定的切片缓冲区,并将字节从原始行复制到新缓冲区:
func consumer(input chan []byte, output chan map[string]*StationData, wg *sync.WaitGroup) { defer wg.Done() data := make(map[string]*StationData) nameBuffer := make([]byte, 100) temperatureBuffer := make([]byte, 50) for reading := range input { scanner := bufio.NewScanner(bytes.NewReader(reading)) for scanner.Scan() { line := scanner.Bytes() nameSize, tempSize := parseLine(line, nameBuffer, temperatureBuffer) name := string(nameBuffer[:nameSize]) temperature, err := strconv.ParseFloat(string(temperatureBuffer[:tempSize]), 64) ...func parseLine(line, nameBuffer, temperatureBuffer []byte) (nameSize, tempSize int) { i, j := 0, 0 for line[i] != 59 { // stops at 59, which is the ASCII code for; nameBuffer[j] = line[i] i++ j++ } i++ // skip ; k := 0 for i < len(line) && line[i] != 10 { // stops at 10, which is the ASCII code for \n temperatureBuffer[k] = line[i] i++ k++ } return j, k}
仅通过此更改,我们就可以达到5.526411 秒。
优化 | 时间(以秒为单位) |
单线程 | 95.000000 |
协程(Goroutine) | 8.327305 |
自定义字节分割 | 5.526411 |
新的火焰图如下所示:
现在,下一个主要问题是字节到字符串的转换(对于名称)以及随后的映射查找。前者是一个问题,因为该语句string(nameBuffer[:nameSize])也分配内存。幸运的是,在循环交互中不需要这种转换。
字符串形式的名称有两个用途:首先,将其存储在StationData结构中,其次,用于在映射中查找。映射查找涉及从键中提取哈希并应用内部逻辑来定位结构中的相应数据。我们可以通过发送预散列密钥来加速此过程。
我决定使用FNV hash,它是 Go 中的内置函数。我不知道它是如何工作的,但它是有效的:
hash := fnv.New64a() // Create a new FNV hash nameBuffer := make([]byte, 100) temperatureBuffer := make([]byte, 50) for reading := range input { scanner := bufio.NewScanner(bytes.NewReader(reading)) for scanner.Scan() { line := scanner.Bytes() nameSize, tempSize := parseLine(line, nameBuffer, temperatureBuffer) // Note that we removed the string convertion here name := nameBuffer[:nameSize] temperature, err := strconv.ParseFloat(string(temperatureBuffer[:tempSize]), 64) if err != nil { panic(err) } hash.Reset() hash.Write(name) id := hash.Sum64() // Compute the data key here, generating a uint64 station, ok := data[id] if !ok { data[id] = &StationData{strign(name), temperature, temperature, temperature, 1} } else { ...
另一个显着改进:4.237007 秒。
优化 | 时间(以秒为单位) |
单线程 | 95.000000 |
协程(Goroutine) | 8.327305 |
自定义字节分割 | 5.526411 |
自定义字节哈希(Custom Byte Hash) | 4.237007 |
下图显示了新的火焰图。请注意, 被slicebytetostring严重减少并mapaccess2_faststr更改为mapacess2_fast64。
下一个大错是strconv.ParseFloat。我尝试了直接将字节转换为浮点数的相同方法:
// Attempt 1:var temperature float64binary.Read(bytes.NewReader(temperatureBuffer[:k]), binary.BigEndian, &temperature)// Attemp 2:temperature, err := bytesconv.ParseFloat(temperatureBuffer[:k], 64)
第一次尝试是使用binary内置包。但它的表现却差了很多。第二次尝试是使用包bytesconv中的包perf,如您在此处看到的,但结果是相同的。我还考虑过解析各个字节,但我无法想到对当前功能有任何真正的改进。
然而,此时,我已经咨询了一些 Java 解决方案,他们使用的最佳方法之一是将温度转换为 int 而不是 float,事实证明这效率更高。只需使用:
temperature, err := strconv.ParseInt(string(temperatureBuffer[:tempSize]), 10, 64)
已经显示出一些改进。但请注意,如果我们只是将数据从字符串转换为 int,我们将丢失小数点。因此,我编写了一个自定义 int 转换,它将保留小数点并生成与 等效的 int int(float64(temperature_string)*10)。我们可以调整最终结果,将最小值、平均值和最大值除以 10。
func bytesToInt(byteArray []byte) int { var result int negative := false for _, b := range byteArray { if b == 45 { // 45 = `-` signal negative = true continue } // For each new number, move the old number one digit to the left. result = result*10 + int(b-48) // 48 = '0', 49 = '1', ... } if negative { return -result } return result}temperature := bytesToInt(temperatureBuffer[:tempSize])
又一个重大改进:3.079632 秒。
优化 | 时间(以秒为单位) |
单线程 | 95.000000 |
Goroutine | 8.327305 |
自定义字节分割 | 5.526411 |
自定义字节哈希 | 4.237007 |
解析浮点数 | 3.079632 |
请注意,在新的火焰图中,我们Scanner.Scan现在可以看到 成为相关部分。
自定义扫描函数非常简单,我们只需读取字节直到找到\n.我更改了parseLine以包含以下逻辑:
for reading := range input { readingIndex := 0 for readingIndex < len(reading) { next, nameSize, tempSize := nextLine(readingIndex, reading, nameBuffer, temperatureBuffer) readingIndex = next name := nameBuffer[:nameSize] temperature := bytesToInt(temperatureBuffer[:tempSize]) ...func nextLine(readingIndex int, reading, nameBuffer, temperatureBuffer []byte) (nexReadingIndex, nameSize, tempSize int) { i := readingIndex j := 0 for reading[i] != 59 { // ; nameBuffer[j] = reading[i] i++ j++ } i++ // skip ; k := 0 for i < len(reading) && reading[i] != 10 { // \n if reading[i] == 46 { // . i++ continue } temperatureBuffer[k] = reading[i] i++ k++ } readingIndex = i + 1 return readingIndex, j, k}
使我们处于2.825991 秒
优化 | 时间(以秒为单位) |
单线程 | 95.000000 |
Goroutine | 8.327305 |
自定义字节分割 | 5.526411 |
自定义字节哈希 | 4.237007 |
解析浮点数 | 3.079632 |
自定义扫描 | 2.825991 |
像往常一样,下一个火焰图:
在注意到fnv.(*sum64a).Write.此时,我已经完成了一些分析,并且可以提取有关测量文件中的数据的一些见解。一个有趣的发现是确定在不与其他站冲突的情况下表示站名称所需的字节数。在我的数据库中,我发现我需要 9 个字节,值范围从 65 到 ~196。
使用这些信息,我想到了将每个数字连接成一个大的 uint64,同时确保该值不超过上限18446744073709551615:
func hash(name []byte) uint64 { n := min(len(name), 10) // 10 bytes, one more than we need just to be safe var result uint64 for _, b := range name[:n] { v := b - 65 var m uint64 = 10 if v >= 10 { m = 100 } else if v >= 100 { m = 1000 } result = result*m | uint64(b) } return result}
在我的解决方案中实现它之前,我对 fnv 哈希进行了测试,运行这些函数超过 410 个名称,迭代 10000 次:
值得注意的是,这个哈希值是非常特殊的,并且可能在不同的数据集中失败,这是稍后的问题。尽管如此,在这个特殊情况下,结果是2.717442 秒。
优化 | 时间(以秒为单位) |
单线程 | 95.000000 |
Goroutine | 8.327305 |
自定义字节分割 | 5.526411 |
自定义字节哈希 | 4.237007 |
解析浮点数 | 3.079632 |
自定义扫描 | 2.825991 |
自定义哈希值 | 2.717442 |
新的火焰图:
是时候对付房间里的大象了,runtime.mapaccess2_fast64地图查找。尽管花了几个小时的研究,但我找不到任何可行的方法来优化内置map。然而,有一个名为Swiss Map的社区替代方案,它自称比内置map更快、内存效率更高。替换它几乎是一个简单的过程,只需进行一些语法更改:
data := swiss.NewMap[uint64, *StationData](1024) // important the initial value to be high ... station, ok := data.Get(id) if !ok { data.Put(id, &StationData{string(name), temperature, temperature, temperature, 1}) } else { ...
使用Swiss Map,我不仅可以将最短时间减少到2.677549 秒,而且时间更加一致地接近 2.7 秒。
优化 | 时间(以秒为单位) |
单线程 | 95.000000 |
Goroutine | 8.327305 |
自定义字节分割 | 5.526411 |
自定义字节哈希 | 4.237007 |
解析浮点数 | 3.079632 |
自定义扫描 | 2.825991 |
自定义哈希值 | 2.717442 |
Swiss Map | 2.677549 |
重新审视哈希函数,我在网上找到了这个算法,它看起来比我的哈希更简单:
func hash(name []byte) uint64 { var h uint64 = 5381 for _, b := range name { h = (h << 5) + h + uint64(b) } return h}
重复我之前做过的相同测试:
几乎好 5 倍。有了这个新版本,我可以进一步将时间减少到2.588396 秒。
优化 | 时间(以秒为单位) |
单线程 | 95.000000 |
Goroutine | 8.327305 |
自定义字节分割 | 5.526411 |
自定义字节哈希 | 4.237007 |
解析浮点数 | 3.079632 |
自定义扫描 | 2.825991 |
自定义哈希值 | 2.717442 |
Swiss Map | 2.677549 |
DJB2 hash | 2.588396 |
消费者是热路径,因此,其中的任何函数调用都可能产生不必要的开销。然而,内联所有函数并没有显示出任何改进,反而让我丢失了分析信息。
在起草本文时,我意识到我非常接近达到我使用主线程配置设置的最小阈值。经过分析,主线程中的大部分时间都与 goroutine 通信相关 - 读取文件需要 0.98 秒,将数据传送给 goroutine 需要 1.306 秒 - 我突然意识到我可以将引用移至file消费者并完全消除通信开销,包括将通信缓冲区替换为固定缓冲区,减少内存分配开销。
readBuffer := make([]byte, READ_BUFFER_SIZE)for { lock.Lock() n, err := file.Read(readBuffer) lock.Unlock() ...}
通过将读取任务委托给使用者,goroutine 可以使用互斥体在本地读取文件,以避免任何并发问题。仅出于测试目的,我将丢弃缓冲区的第一行和最后一行,以避免目前复杂的分布式剩余逻辑。结果进一步缩短至2.108564秒!
优化 | 时间(以秒为单位) |
单线程 | 95.000000 |
Goroutine | 8.327305 |
自定义字节分割 | 5.526411 |
自定义字节哈希 | 4.237007 |
解析浮点数 | 3.079632 |
自定义扫描 | 2.825991 |
自定义哈希值 | 2.717442 |
Swiss Map | 2.677549 |
DJB2 hash | 2.588396 |
Worker Reading | 2.108564(无效输出) |
为了恢复每个块的第一行和最后一行,我创建了一个Trash Bingoroutine,它从其他 goroutine 接收丢弃的部分,并尝试将各个字节合并为完整的行:
请注意,第一个 Goroutine 的第一行始终是完整的有效行,最后一个 Goroutine 的最后一行始终为空(文件以 \n 结尾)。之间的所有部分都通过它们的 id 进行匹配。每次从文件读取都会增加 id,第一行保留为前一个 id,最后一行采用下一个 id。该过程由读取文件时使用的同一个互斥锁控制,保证了并发一致性。垃圾箱合并这些部分,考虑到它们可能是块的初始字节(绿色)或块的最终字节(蓝色)。加起来我们的时间是2.107401 秒
优化 | 时间(以秒为单位) |
单线程 | 95.000000 |
Goroutine | 8.327305 |
自定义字节分割 | 5.526411 |
自定义字节哈希 | 4.237007 |
解析浮点数 | 3.079632 |
自定义扫描 | 2.825991 |
自定义哈希值 | 2.717442 |
Swiss Map | 2.677549 |
DJB2 hash | 2.588396 |
Worker Reading | 2.107401 |
我终于意识到我不需要名称和温度缓冲区。如果我只使用读取缓冲区的子切片,则不需要一遍又一遍地复制名称和温度字节。通过此更改,我可以进一步将时间缩短至2.070337 秒!
优化 | 时间(以秒为单位) |
单线程 | 95.000000 |
Goroutine | 8.327305 |
自定义字节分割 | 5.526411 |
自定义字节哈希 | 4.237007 |
解析浮点数 | 3.079632 |
自定义扫描 | 2.825991 |
自定义哈希值 | 2.717442 |
Swiss Map | 2.677549 |
DJB2 hash | 2.588396 |
无名称和临时缓冲区 | 2.070337 |
最终火焰图:
为了以时尚的方式完成,我想执行一个新的网格测试。但是,我需要为每个设置提供更多样本,以解决时间变化问题,因为时间变化现在彼此太接近了。由于我删除了通道缓冲区,因此我只有两个参数:读取缓冲区大小 ( READ_BUFFER_SIZE) 和工作人员数量 ( N_WORKERS)。
每个配置运行 15 次(有[READ_BUFFER_SIZE/N_WORKER]变化)后,结果如下:
[1024/025] Median:2.219864 Min:2.176016 Max:2.250926 Average:2.218512[1024/045] Median:2.232022 Min:2.219185 Max:2.254537 Average:2.232696[1024/050] Median:2.230360 Min:2.224120 Max:2.252666 Average:2.232072[1024/055] Median:2.231307 Min:2.211719 Max:2.257619 Average:2.232077[1024/075] Median:2.184521 Min:2.172680 Max:2.202394 Average:2.184986[1024/100] Median:2.114413 Min:2.098569 Max:2.126056 Average:2.114260[2048/025] Median:2.112397 Min:2.082739 Max:2.127026 Average:2.109444[2048/045] Median:2.080504 Min:2.052388 Max:2.101977 Average:2.077760[2048/050] Median:2.060059 Min:2.043855 Max:2.081475 Average:2.063090[2048/055] Median:2.052428 Min:2.036254 Max:2.064517 Average:2.051182[2048/075] Median:2.006269 Min:1.987433 Max:2.037105 Average:2.008363[2048/100] Median:2.012136 Min:1.998069 Max:2.033619 Average:2.012824[2500/025] Median:2.085206 Min:2.051325 Max:2.149669 Average:2.088950[2500/045] Median:2.052305 Min:2.042962 Max:2.065635 Average:2.052392[2500/050] Median:2.042339 Min:2.023924 Max:2.064953 Average:2.043449[2500/055] Median:2.042570 Min:2.033146 Max:2.058255 Average:2.043690[2500/075] Median:2.030126 Min:2.021994 Max:2.059866 Average:2.033594[2500/100] Median:2.033681 Min:2.019922 Max:2.052254 Average:2.035110[3000/025] Median:2.124746 Min:2.082351 Max:2.138877 Average:2.118401[3000/045] Median:2.065936 Min:2.038780 Max:2.082305 Average:2.059880[3000/050] Median:2.053482 Min:2.036321 Max:2.073193 Average:2.052350[3000/055] Median:2.058694 Min:2.039742 Max:2.071381 Average:2.055151[3000/075] Median:2.044610 Min:2.031621 Max:2.072240 Average:2.046833[3000/100] Median:2.051081 Min:2.041319 Max:2.070420 Average:2.053736[3500/025] Median:2.115581 Min:2.085326 Max:2.142923 Average:2.111501[3500/045] Median:2.062000 Min:2.050305 Max:2.079701 Average:2.061979[3500/050] Median:2.057308 Min:2.047556 Max:2.071838 Average:2.058065[3500/055] Median:2.058585 Min:2.047697 Max:2.072047 Average:2.058442[3500/075] Median:2.060888 Min:2.051787 Max:2.070400 Average:2.060377[3500/100] Median:2.067534 Min:2.056189 Max:2.081957 Average:2.068087[4096/025] Median:2.101917 Min:2.078075 Max:2.139795 Average:2.104907[4096/045] Median:2.071588 Min:2.053040 Max:2.078682 Average:2.068556[4096/050] Median:2.065348 Min:2.055219 Max:2.108920 Average:2.069129[4096/055] Median:2.062510 Min:2.056066 Max:2.077945 Average:2.064237[4096/075] Median:2.076443 Min:2.067886 Max:2.111381 Average:2.080161[4096/100] Median:2.090874 Min:2.078334 Max:2.160026 Average:2.095570
正如您所看到的,READ_BUFFER_SIZE = 2048 * 2048和N_WORKERS = 75,在某些运行中甚至可以达到不到 2 秒。
现在使用获胜者配置进行最终测试,我将运行次数增加到 55 次,并关闭了计算机中除终端之外的所有内容。结果是:
优化 | 时间(以秒为单位) |
单线程 | 95.000000 |
Goroutine | 8.327305 |
自定义字节分割 | 5.526411 |
自定义字节哈希 | 4.237007 |
解析浮点数 | 3.079632 |
自定义扫描 | 2.825991 |
自定义哈希值 | 2.717442 |
Swiss Map | 2.677549 |
DJB2 hash | 2.588396 |
无名称和临时缓冲区 | 2.070337 |
网格测试 | 1.969090 |
参加 10 亿行挑战对我来说真的很有趣,我没想到会取得如此好的成绩,因为我在此类优化方面的经验很少甚至没有。此外,对我来说,结果更加令人难以置信,因为我并没有像最好的 Java 解决方案那样太费心单独操作字节。
最后版本的代码如下所示:
package mainimport ( "fmt" "io" "os" "r2p/utils" "slices" "sort" "sync" "github.com/dolthub/swiss")const READ_BUFFER_SIZE = 2048 * 2048const N_WORKERS = 75type TrashItem struct { Idx int Value []byte Initial bool}type StationData struct { Name string Min int Max int Sum int Count int}var lock = &sync.Mutex{}var lockIdx = 0func trashBin(input chan *TrashItem, output chan *swiss.Map[uint64, *StationData], wg *sync.WaitGroup) { defer wg.Done() data := swiss.NewMap[uint64, *StationData](1024) can := []*TrashItem{} buffer := make([]byte, 1024) for item := range input { can = append(can, item) can = saveCan(can, data, buffer) } output <- data}func saveCan(can []*TrashItem, data *swiss.Map[uint64, *StationData], buffer []byte) []*TrashItem { for i, ref := range can { if ref.Idx == 0 { _, nameInit, nameEnd, tempInit, tempEnd := nextLine(0, ref.Value) processLine(ref.Value[nameInit:nameEnd], ref.Value[tempInit:tempEnd], data) return slices.Delete(can, i, i+1) } for j, oth := range can { if ref.Idx == oth.Idx && i != j { if ref.Initial { copy(buffer[:len(ref.Value)], ref.Value) copy(buffer[len(ref.Value):], oth.Value) } else { copy(buffer[:len(oth.Value)], oth.Value) copy(buffer[len(oth.Value):], ref.Value) } total := len(ref.Value) + len(oth.Value) end, nameInit, nameEnd, tempInit, tempEnd := nextLine(0, buffer) processLine(buffer[nameInit:nameEnd], buffer[tempInit:tempEnd], data) if end < total { _, nameInit, nameEnd, tempInit, tempEnd := nextLine(end, buffer) processLine(buffer[nameInit:nameEnd], buffer[tempInit:tempEnd], data) } if i > j { can = slices.Delete(can, i, i+1) can = slices.Delete(can, j, j+1) } else { can = slices.Delete(can, j, j+1) can = slices.Delete(can, i, i+1) } return can } } } return can}func consumer(file *os.File, trash chan *TrashItem, output chan *swiss.Map[uint64, *StationData], wg *sync.WaitGroup) { defer wg.Done() data := swiss.NewMap[uint64, *StationData](1024) readBuffer := make([]byte, READ_BUFFER_SIZE) for { lock.Lock() lockIdx++ idx := lockIdx n, err := file.Read(readBuffer) lock.Unlock() if err == io.EOF { break } if err != nil { panic(err) } // ignoring first line start := 0 for i := 0; i < n; i++ { if readBuffer[i] == 10 { start = i + 1 break } } trash <- &TrashItem{idx - 1, readBuffer[:start], false} // ignoring last line final := 0 for i := n - 1; i >= 0; i-- { if readBuffer[i] == 10 { final = i break } } trash <- &TrashItem{idx, readBuffer[final+1 : n], true} readingIndex := start for readingIndex < final { next, nameInit, nameEnd, tempInit, tempEnd := nextLine(readingIndex, readBuffer) readingIndex = next processLine(readBuffer[nameInit:nameEnd], readBuffer[tempInit:tempEnd], data) } } output <- data}func nextLine(readingIndex int, reading []byte) (nexReadingIndex, nameInit, nameEnd, tempInit, tempEnd int) { i := readingIndex nameInit = readingIndex for reading[i] != 59 { // ; i++ } nameEnd = i i++ // skip ; tempInit = i for i < len(reading) && reading[i] != 10 { // \n i++ } tempEnd = i readingIndex = i + 1 return readingIndex, nameInit, nameEnd, tempInit, tempEnd}func processLine(name, temperature []byte, data *swiss.Map[uint64, *StationData]) { temp := bytesToInt(temperature) id := hash(name) station, ok := data.Get(id) if !ok { data.Put(id, &StationData{string(name), temp, temp, temp, 1}) } else { if temp < station.Min { station.Min = temp } if temp > station.Max { station.Max = temp } station.Sum += temp station.Count++ }}func run() { outputChannels := make([]chan *swiss.Map[uint64, *StationData], N_WORKERS+1) // Read file file, err := os.Open("measurements.txt") if err != nil { panic(err) } defer file.Close() var wg sync.WaitGroup var wgTrash sync.WaitGroup wg.Add(N_WORKERS) wgTrash.Add(1) trash := make(chan *TrashItem, N_WORKERS*2) output := make(chan *swiss.Map[uint64, *StationData], 1) go trashBin(trash, output, &wgTrash) outputChannels[0] = output for i := 0; i < N_WORKERS; i++ { output := make(chan *swiss.Map[uint64, *StationData], 1) go consumer(file, trash, output, &wg) outputChannels[i+1] = output } wg.Wait() close(trash) wgTrash.Wait() for i := 0; i < N_WORKERS+1; i++ { close(outputChannels[i]) } data := swiss.NewMap[uint64, *StationData](1000) for i := 0; i < N_WORKERS+1; i++ { m := <-outputChannels[i] m.Iter(func(station uint64, stationData *StationData) bool { v, ok := data.Get(station) if !ok { data.Put(station, stationData) } else { if stationData.Min < v.Min { v.Min = stationData.Min } if stationData.Max > v.Max { v.Max = stationData.Max } v.Sum += stationData.Sum v.Count += stationData.Count } return false }) } printResult(data)}func hash(name []byte) uint64 { var h uint64 = 5381 for _, b := range name { h = (h << 5) + h + uint64(b) } return h}func printResult(data *swiss.Map[uint64, *StationData]) { result := make(map[string]*StationData, data.Count()) keys := make([]string, 0, data.Count()) data.Iter(func(k uint64, v *StationData) (stop bool) { keys = append(keys, v.Name) result[v.Name] = v return false }) sort.Strings(keys) print("{") for _, k := range keys { v := result[k] fmt.Printf("%s=%.1f/%.1f/%.1f, ", k, float64(v.Min)/10, (float64(v.Sum)/10)/float64(v.Count), float64(v.Max)/10) } print("}\n")}func bytesToInt(byteArray []byte) int { var result int negative := false for _, b := range byteArray { if b == 46 { // . continue } if b == 45 { // - negative = true continue } result = result*10 + int(b-48) } if negative { return -result } return result}func main() { f, err := os.Create("cpu_profile.prof") if err != nil { panic(err) } defer f.Close() if err := pprof.StartCPUProfile(f); err != nil { panic(err) } defer pprof.StopCPUProfile() started := time.Now() run() fmt.Printf("%0.6f\n", time.Since(started).Seconds())}
原文地址
:https://dev.to/r2p/one-billion-row-challenge-in-golang-from-95s-to-196s-4980