如何使用Golang实现并发数据导入_Golang多goroutine文件解析示例

直接开大量goroutine解析CSV会崩溃,因无并发控制导致文件偏移量冲突、内存溢出及数据库连接超限;应通过chan分发任务、独立csv.Reader、带行号的错误定位和资源限制来解决。

为什么直接开一堆 goroutine 解析 CSV 会崩

因为没加并发控制,runtime.GOMAXPROCS 默认是 CPU 核数,但文件解析本身不耗 CPU,主要卡在 I/O 和内存分配上。开几百个 goroutine 读同一个 *os.File,会出现 seek: invalid argument 或数据错乱——多个 goroutine 共享文件偏移量,互相覆盖读取位置。

  • 别用 bufio.Scanner 在多个 goroutine 里共用一个 bufio.Reader
  • 别把整个大文件一次性 ReadAll 进内存再切分,OOM 风险高
  • 每行解析后若要写入数据库,必须控制写入并发度,否则 MySQL 报 Too many connections

sync.WaitGroup + chan 拆分文件并行处理

核心思路:主线程按行或按块(如每 1000 行)切分文件,把行内容发给 worker channel;worker goroutine 从 channel 拿数据、解析、组装结构体、提交到下游(DB / slice / channel)。

file, _ := os.Open("data.csv")
defer file.Close()
reader := bufio.NewReader(file)

// 启动 4 个 worker jobs := make(chan []string, 100) results := make(chan error, 100) var wg sync.WaitGroup

for w := 0; w < 4; w++ { wg.Add(1) go func() { defer wg.Done() for line := range jobs { // 解析 CSV 行:跳过空行、处理引号、转义等 if len(line) == 0 { continue } record := parseCSVLine(line) // 自定义函数 if err := insertToDB(record); err != nil { results <- err return } } }() }

// 主线程逐行读,分发 job for { line, err := reader.ReadString('\n') if err == io.EOF { break } if err != nil { panic(err) } fields, _ := csv.NewReader(strings.NewReader(line)).Read() select { case jobs <- fields: default: // channel 满了就阻塞,避免内存暴涨 time.Sleep(10 * time.Millisecond) jobs <- fields } } close(jobs) wg.Wait() close(results)

csv.Reader 本身不是并发安全的,但可以每个 goroutine 持有独立实例

如果你确定文件可随机访问(比如已按字节范围切好),更稳妥的做法是:预读文件,按 \n 找出行边界,把 []byte 切片传给每个 goroutine,各自 new 一个 csv.Reader 解析——这样完全隔离,无共享状态。

  • bytes.Split(data, []byte{'\n'}) 切分比用 bufio.Scanner 更可控
  • 注意最后一行可能没换行符,需单独处理
  • csv.ReaderFieldsPerRecordTrailingComma 要按实际数据设,否则解析失败静默丢数据

导入失败时如何定位哪一行出错

不要只返回 error,要把原始行号、原始字符串、解析上下文一起打包。否则日志里看到 parse int: invalid syntax 完全不知道是第几行、什么内容。

type ParseError struct {
    LineNum int
    RawLine string
    Err     error
}

func parseCSVLine(fields []string) (Record, error) { if len(fields) < 3 { return Record{}, &ParseError{LineNum: currentLine, RawLine: strings.Join(fields, ","), Err: fmt.Errorf("too few fields")} } id, err := strconv.Atoi(fields[0]) if err != nil { return Record{}, &ParseError{LineNum: currentLine, RawLine: strings.Join(fields, ","), Err: err} } return Record{ID: id, Name: fields[1], Email: fields[2]}, nil }

真正难的不是起 goroutine,而是让每条错误能反查到原始输入位置,以及控制住内存和连接数。文件越大数据越容易在第 3 万行突然卡住——那往往不是代码逻辑问题,是没设 context.WithTimeout 或 DB 连接池 SetMaxOpenConns