golang实现大文件读取的代码示例

 更新时间:2024年04月03日 09:56:29   作者:白马啸西施  
在实际工作,我们需要读取大数据文件,文件可能上G百G,所以我们不可能一次性的读取到内存,接下来本文给大家介绍了golang实现大文件读取的示例,需要的朋友可以参考下

在实际工作,我们需要读取大数据文件,文件可能上G百G,所以我们不可能一次性的读取到内存,io.readAll不可用,那么我们可以考虑分块,IO流的方式如io.copy.

对比两者:

io.ReadAll:

io.ReadAll 是一个方便的函数,可以将整个文件内容一次性读取到内存中,并返回一个字节切片。这在处理小文件或者需要一次性加载数据的情况下非常适用。然而,对于大文件,使用 io.ReadAll 可能会导致以下问题:

  • 内存消耗:读取大文件可能导致内存消耗急剧增加,甚至超出可用内存限制。
  • 性能问题:应用程序的响应性可能下降,用户可能会感到应用程序不再响应。
  • 延迟问题:大文件的读取需要更多时间,可能导致较长的延迟。

io.Copy:

io.Copy 函数通过逐块的方式从源读取数据并将其写入目标,适用于流式传输大文件。它具有以下优势:

  • 低内存消耗:io.Copy 逐块处理数据,不需要将整个文件加载到内存中,从而降低内存消耗。
  • 高性能:流式传输提高了读取和写入的效率,适用于需要高性能处理大文件的情况。
  • 更好的响应性:io.Copy 不会一次性阻塞等待整个文件读取完成,从而提高应用程序的响应性

示例:

package test
 
import (
	"fmt"
	"io"
	"os"
	"runtime"
	"testing"
)
 
func largeFileRead(_file string) {
	f, err := os.Open(_file)
	if err != nil {
		fmt.Errorf("打开文件错误:%v", err)
		return
	}
 
	defer f.Close()
 
	// 读取数据大写
	buffer := make([]byte, 4096)
	for {
		getMemory()
		n, err := f.Read(buffer)
		if err != nil && err != io.EOF {
			fmt.Errorf("读取文件错误:%v", err)
			return
		}
 
		if n == 0 {
			break
		}
 
		fmt.Println("内容:", string(buffer))
	}
	fmt.Println("读取完成")
}
 
func getMemory() {
	// 获取内存信息
	var m runtime.MemStats
	runtime.ReadMemStats(&m)
	fmt.Printf("%d KB\n", m.Alloc/1024)
}
 
func Test_largeFileRead(t *testing.T) {
	fileName := "D:xxxx.txt"
	largeFileRead(fileName)
}

运行结果:

实时内存占用:854KB,文件大小102M

拓展:Golang并发读取超大文件

当今世界的任何计算机系统每天都会生成大量的日志或数据。随着系统的发展,将调试数据存储到数据库中是不可行的,因为它们是不可变的,并且只能用于分析和解决故障。所以大部分公司倾向于将日志存储在文件中,而这些文件通常位于本地磁盘中。

我们将使用Go语言,从一个大小为16GB的.txt或.log文件中提取日志。

让我们开始编码……

首先,我们打开文件。对于任何文件的IO,我们都将使用标准的Go库os.File。

f, err := os.Open(fileName)
 if err != nil {
   fmt.Println("cannot able to read the file", err)
   return
 }
// UPDATE: close after checking error
defer file.Close()  //Do not forget to close the file

打开文件后,我们有以下两个选项可以选择:

  • 逐行读取文件,这有助于减少内存紧张,但需要更多的时间。
  • 一次将整个文件读入内存并处理该文件,这将消耗更多内存,但会显著减少时间。

由于文件太大,即16 GB,因此无法将整个文件加载到内存中。但是第一种选择对我们来说也是不可行的,因为我们希望在几秒钟内处理文件。

但你猜怎么着,还有第三种选择。瞧……相比于将整个文件加载到内存中,在Go语言中,我们还可以使用bufio.NewReader()将文件分块加载。

r := bufio.NewReader(f)
for {
buf := make([]byte,4*1024) //the chunk size
n, err := r.Read(buf) //loading chunk into buffer
   buf = buf[:n]
if n == 0 {
   
     if err != nil {
       fmt.Println(err)
       break
     }
     if err == io.EOF {
       break
     }
     return err
  }
}

一旦我们将文件分块,我们就可以分叉一个线程,即Go routine,同时处理多个文件区块。上述代码将修改为:

//sync pools to reuse the memory and decrease the preassure on Garbage Collector
linesPool := sync.Pool{New: func() interface{} {
        lines := make([]byte, 500*1024)
        return lines
}}
stringPool := sync.Pool{New: func() interface{} {
          lines := ""
          return lines
}}
slicePool := sync.Pool{New: func() interface{} {
           lines := make([]string, 100)
           return lines
}}
r := bufio.NewReader(f)
var wg sync.WaitGroup //wait group to keep track off all threads
for {
     
     buf := linesPool.Get().([]byte)
     n, err := r.Read(buf)
     buf = buf[:n]
if n == 0 {
        if err != nil {
            fmt.Println(err)
            break
        }
        if err == io.EOF {
            break
        }
        return err
     }
nextUntillNewline, err := r.ReadBytes('\n')//read entire line
     
     if err != io.EOF {
         buf = append(buf, nextUntillNewline...)
     }
     
     wg.Add(1)
     go func() { 
      
        //process each chunk concurrently
        //start -> log start time, end -> log end time
        
        ProcessChunk(buf, &linesPool, &stringPool, &slicePool,     start, end)
wg.Done()
     
     }()
}
wg.Wait()
}

上面的代码,引入了两个优化点:

  • sync.Pool是一个强大的对象池,可以重用对象来减轻垃圾收集器的压力。我们将重用各个分片的内存,以减少内存消耗,大大加快我们的工作。
  • Go Routines帮助我们同时处理缓冲区块,这大大提高了处理速度。

现在让我们实现ProcessChunk函数,它将处理以下格式的日志行。

2020-01-31T20:12:38.1234Z, Some Field, Other Field, And so on, Till new line,...\n

我们将根据命令行提供的时间戳提取日志。

func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool, start time.Time, end time.Time) {
//another wait group to process every chunk further                             
      var wg2 sync.WaitGroup
logs := stringPool.Get().(string)
logs = string(chunk)
linesPool.Put(chunk) //put back the chunk in pool
//split the string by "\n", so that we have slice of logs
      logsSlice := strings.Split(logs, "\n")
stringPool.Put(logs) //put back the string pool
chunkSize := 100 //process the bunch of 100 logs in thread
n := len(logsSlice)
noOfThread := n / chunkSize
if n%chunkSize != 0 { //check for overflow 
         noOfThread++
      }
length := len(logsSlice)
//traverse the chunk
     for i := 0; i < length; i += chunkSize {
         
         wg2.Add(1)
//process each chunk in saperate chunk
         go func(s int, e int) {
            for i:= s; i<e;i++{
               text := logsSlice[i]
if len(text) == 0 {
                  continue
               }
           
            logParts := strings.SplitN(text, ",", 2)
            logCreationTimeString := logParts[0]
            logCreationTime, err := time.Parse("2006-01-  02T15:04:05.0000Z", logCreationTimeString)
if err != nil {
                 fmt.Printf("\n Could not able to parse the time :%s       for log : %v", logCreationTimeString, text)
                 return
            }
// check if log's timestamp is inbetween our desired period
          if logCreationTime.After(start) && logCreationTime.Before(end) {
          
            fmt.Println(text)
           }
        }
        textSlice = nil
        wg2.Done()
     
     }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
   //passing the indexes for processing
}  
   wg2.Wait() //wait for a chunk to finish
   logsSlice = nil
}

对上面的代码进行基准测试。以16 GB的日志文件为例,提取日志所需的时间约为25秒。

完整的代码示例如下:

func main() {

 s := time.Now()
 args := os.Args[1:]
 if len(args) != 6 { // for format  LogExtractor.exe -f "From Time" -t "To Time" -i "Log file directory location"
  fmt.Println("Please give proper command line arguments")
  return
 }
 startTimeArg := args[1]
 finishTimeArg := args[3]
 fileName := args[5]

 file, err := os.Open(fileName)
 
 if err != nil {
  fmt.Println("cannot able to read the file", err)
  return
 }
 
 defer file.Close() //close after checking err
 
 queryStartTime, err := time.Parse("2006-01-02T15:04:05.0000Z", startTimeArg)
 if err != nil {
  fmt.Println("Could not able to parse the start time", startTimeArg)
  return
 }

 queryFinishTime, err := time.Parse("2006-01-02T15:04:05.0000Z", finishTimeArg)
 if err != nil {
  fmt.Println("Could not able to parse the finish time", finishTimeArg)
  return
 }

 filestat, err := file.Stat()
 if err != nil {
  fmt.Println("Could not able to get the file stat")
  return
 }

 fileSize := filestat.Size()
 offset := fileSize - 1
 lastLineSize := 0

 for {
  b := make([]byte, 1)
  n, err := file.ReadAt(b, offset)
  if err != nil {
   fmt.Println("Error reading file ", err)
   break
  }
  char := string(b[0])
  if char == "\n" {
   break
  }
  offset--
  lastLineSize += n
 }

 lastLine := make([]byte, lastLineSize)
 _, err = file.ReadAt(lastLine, offset+1)

 if err != nil {
  fmt.Println("Could not able to read last line with offset", offset, "and lastline size", lastLineSize)
  return
 }

 logSlice := strings.SplitN(string(lastLine), ",", 2)
 logCreationTimeString := logSlice[0]

 lastLogCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)
 if err != nil {
  fmt.Println("can not able to parse time : ", err)
 }

 if lastLogCreationTime.After(queryStartTime) && lastLogCreationTime.Before(queryFinishTime) {
  Process(file, queryStartTime, queryFinishTime)
 }

 fmt.Println("\nTime taken - ", time.Since(s))
}

func Process(f *os.File, start time.Time, end time.Time) error {

 linesPool := sync.Pool{New: func() interface{} {
  lines := make([]byte, 250*1024)
  return lines
 }}

 stringPool := sync.Pool{New: func() interface{} {
  lines := ""
  return lines
 }}

 r := bufio.NewReader(f)

 var wg sync.WaitGroup

 for {
  buf := linesPool.Get().([]byte)

  n, err := r.Read(buf)
  buf = buf[:n]

  if n == 0 {
   if err != nil {
    fmt.Println(err)
    break
   }
   if err == io.EOF {
    break
   }
   return err
  }

  nextUntillNewline, err := r.ReadBytes('\n')

  if err != io.EOF {
   buf = append(buf, nextUntillNewline...)
  }

  wg.Add(1)
  go func() {
   ProcessChunk(buf, &linesPool, &stringPool, start, end)
   wg.Done()
  }()

 }

 wg.Wait()
 return nil
}

func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, start time.Time, end time.Time) {

 var wg2 sync.WaitGroup

 logs := stringPool.Get().(string)
 logs = string(chunk)

 linesPool.Put(chunk)

 logsSlice := strings.Split(logs, "\n")

 stringPool.Put(logs)

 chunkSize := 300
 n := len(logsSlice)
 noOfThread := n / chunkSize

 if n%chunkSize != 0 {
  noOfThread++
 }

 for i := 0; i < (noOfThread); i++ {

  wg2.Add(1)
  go func(s int, e int) {
   defer wg2.Done() //to avaoid deadlocks
   for i := s; i < e; i++ {
    text := logsSlice[i]
    if len(text) == 0 {
     continue
    }
    logSlice := strings.SplitN(text, ",", 2)
    logCreationTimeString := logSlice[0]

    logCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)
    if err != nil {
     fmt.Printf("\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)
     return
    }

    if logCreationTime.After(start) && logCreationTime.Before(end) {
     //fmt.Println(text)
    }
   }
   

  }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
 }

 wg2.Wait()
 logsSlice = nil
}

到此这篇关于golang实现大文件读取的代码示例的文章就介绍到这了,更多相关golang大文件读取内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 读取Go项目中的配置文件的方法

    读取Go项目中的配置文件的方法

    本文主要介绍了读取Go项目中的配置文件的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-06-06
  • golang基础之Interface接口的使用

    golang基础之Interface接口的使用

    这篇文章主要介绍了golang基础之Interface接口的使用,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-07-07
  • Go项目在linux服务器的部署详细步骤

    Go项目在linux服务器的部署详细步骤

    在今天的软件开发中,使用Linux作为操作系统的比例越来越高,而Golang语言则因为其高效、简洁和并发性能等特点,也被越来越多的开发者所青睐,这篇文章主要给大家介绍了关于Go项目在linux服务器的部署详细步骤,需要的朋友可以参考下
    2023-09-09
  • 使用go语言实现Redis持久化的示例代码

    使用go语言实现Redis持久化的示例代码

    redis 是一个内存数据库,如果你把进程杀掉,那么里面存储的数据都会消失,那么这篇文章就是来解决 redis 持久化的问题,本文给大家介绍了使用go语言实现Redis持久化,需要的朋友可以参考下
    2024-07-07
  • Go中crypto/rsa库的高效使用指南

    Go中crypto/rsa库的高效使用指南

    本文主要介绍了Go中crypto/rsa库的高效使用指南,从 RSA 的基本原理到 crypto/rsa 库的实际应用,具有一定的参考价值,感兴趣的可以了解一下
    2024-02-02
  • Golang截取字符串方法示例讲解及对比

    Golang截取字符串方法示例讲解及对比

    这篇文章主要介绍了Golang截取字符串方法,文中介绍了使用rune函数和utf包以及range遍历的方式,熟练掌握这些可以帮助我们更方便地处理字符串,提高编程效率和代码质量,感兴趣的同学可以参考下文
    2023-05-05
  • Go外部依赖包从vendor,$GOPATH和$GOPATH/pkg/mod查找顺序

    Go外部依赖包从vendor,$GOPATH和$GOPATH/pkg/mod查找顺序

    这篇文章主要介绍了Go外部依赖包vendor,$GOPATH和$GOPATH/pkg/mod下查找顺序,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-12-12
  • mayfly-go部署和使用详解

    mayfly-go部署和使用详解

    这篇文章主要介绍了mayfly-go部署和使用详解,此处部署基于CentOS7.4部署,结合实例代码图文给大家讲解的非常详细,需要的朋友可以参考下
    2022-09-09
  • Golang channel底层实现过程解析(深度好文)

    Golang channel底层实现过程解析(深度好文)

    Go语言为了方便使用者,提供了简单、安全的协程数据同步和通信机制,这篇文章主要介绍了Golang channel底层是如何实现的,需要的朋友可以参考下
    2024-07-07
  • Go语言实现顺序存储的线性表实例

    Go语言实现顺序存储的线性表实例

    这篇文章主要介绍了Go语言实现顺序存储的线性表的方法,实例分析了Go语言实现线性表的定义、插入、删除元素等的使用技巧,具有一定参考借鉴价值,需要的朋友可以参考下
    2015-03-03

最新评论