How to read a chunk of lines from a file in Python

This article shows a Python generator that lets you read lines from a file, splitting it up in chucks, or blocks, to let you easily parallelize a program to analyze a very large file, without having to split that file in separate smaller files first.

Suppose you have a large text file where each line is a row of data, and you want to process all this data to come up with some kind of statistics. In most cases like this, what you have is an “embarrassingly parallel problem”, and you could easily make your program faster by following these steps:

  1. Split your data in chunks, or blocks.
  2. Process these blocks separately, and produce an intermediary result for each block.
  3. Combine the results for each of the blocks to produce the result for the whole data.

This is how the “Map/Reduce” model works, and this is the kind of problem you can use Hadoop Streaming  to solve, or use Pig, or also write a script to run in AWS EMR, or something like that.

The problem is that to use these tools your first of all must have the whole environment set up nicely, and your data must be stored in the right place, and most importantly: your data must also have been already split up into chucks. Well, that can be quite of a nuisance if all you have is one single large file that you want to analyse, and you are also not having to deal with tons of computers on a network. All you want instead is to distribute your load over the few processors in your personal computer, for example.

So this is the problem scenario: I have a single large file I want to process, and all I want to do is to code one single Python script to do the job. I have coded all the analysis, and I can already process that big file into a single process. Now I want to use the multiprocessing module to parallelize the process.

So we originally have some code like

fp = open(filename)
for line in fp.readlines():
    process(line)

What I created is a generator that picks up a file object, and then iterates over the lines of the “n-th” chuck of lines. The first step is to find out the file size, what is done by seeking to the end of the file. We then divide the file size by the number of chunks and multiplied by the index of the chunk we want to process. That gives us the start of the chunk. We then “fast-forward” to the beginning of the next line, and start yielding the chunk lines, until we get to the beginning of the next chunk, or to the end of the file. Here is the generator, and how to use it:

def file_block(fp, number_of_blocks, block):
    '''
    A generator that splits a file into blocks and iterates
    over the lines of one of the blocks.

    '''

    assert 0 <= block and block < number_of_blocks
    assert 0 < number_of_blocks

    fp.seek(0,2)
    file_size = fp.tell()

    ini = file_size * block / number_of_blocks
    end = file_size * (1 + block) / number_of_blocks

    if ini <= 0:
        fp.seek(0)
    else:
        fp.seek(ini-1)
        fp.readline()

    while fp.tell() < end:
        yield fp.readline()

if __name__ == '__main__':
    fp = open(filename)
    number_of_chunks = 4
    for chunk_number in range(number_of_chunks):
        print chunk_number, 100 * '='
        for line in file_block(fp, number_of_chunks, chunk_number):
            process(line)

Of course, all that code does right now to process the lines for each chunk sequentially. I am actually also just printing lines right now to debug. Next step is to effectively make use of the multiprocessing module, and replace the outer loop for a distributed processing.

I think this is one neat example of “hiding complexity”. We had a line iterator at first, now we have a line iterator for the lines of a chunk, that can be very easily used for multiprocessing. That generator function is small, but actually hides away quite a lot of complexity, like looking at the file size, managing the need to start at the beginning of the lines, etc. Writing this was one of the coolest “Python experiences” I had recently.

Advertisements

12 thoughts on “How to read a chunk of lines from a file in Python

  1. Shalin Shah

    Hello this is really good way to divide file into chunks but the thing is my text file has around million lines and this code read the entire file in first chunk I mean the entire file is treated as a line. So is there any way to read every time specific number of bytes ?

    Reply
    1. nlw0 Post author

      Hello, Shalin. Thanks for writing, and I’m sorry I didn’t get to reply you earlier.

      I don’t think I quite understand your problem. Does your file have separate lines or not? fp.readline() should be able to just pick one line at a time. Are you sure you are not having a problem with the proper identification of the newline character?

      If you don’t have line separations, records divided by a ‘\n’, then you need to replace the calls to readline() by something that reads a smaller quantity of bytes. That may be a constant size block, or something else controlled by your application.

      Reply
  2. Shalin Shah

    Hello, Yes my file does’nt have any new line character i.e it contains millions of bytes without a new line but I think I got the answer as you said to read specific bytes every time.Thanks.

    Reply
    1. nlw0 Post author

      Hello, Prashant. When we cut the file we choose starting points with no regard to where each line ends. So when we start processing a chunk we first land on a “random” point of a file line. What we do then is to discard the rest of that line, and this is what happens at line 21 from that code. That file line will be the last line read in the previous chunk.

      Reply
  3. kevin

    Thanks, worked for me. Extending for parallel processing was relatively trivial using the Pool.map method.

    Reply
  4. Stefan

    Hi – thanks for your approach. The linear version worked well. My parallelization approach however does not seem to work:

    Using:

    results = [pool.apply_async(processChunk, args=(filename,number_of_chunks,chunk_number,)) for chunk_number in range(number_of_chunks)]

    Where each chunk then iterates over the lines of file_block(open(filename), number_of_chunks, chunk_number) is indeed faster than the linear version. However, it is slower than just using

    file = open(filename)
    results = pool.map_async(processLine, file, numberOfLinesPerChunk)

    (2.863969087600708 sec vs 0.9955136775970459 sec)

    I am pretty sure, I made a mistake when using apply_async – could you give me advice to solve this problem?

    Reply
    1. nic Post author

      Hi Stefan. If I understand correctly map_async will do precisely what we want, splitting the input into chunks to different workers. The difference is that there is probably a centralized thread reading the entire file into memory and then splitting the chunks to the separate threads. This other approach from the post is more suitable for the case when you have a file shared between different machines, and you want to find the chunks without a central control reading the entire file and finding all the chunk limits. If you do that in a single machine, first of all your task may already be IO bounded instead of CPU bounded, so you should make sure of that first, and then if you use this approach you will be opening the same file again and again in the same machine, what might even create an overhead making it a slower approach. That’s my guess. Please do tell us if you find out what is going on there!

      Reply
      1. Stefan

        Thank you for replying so fast.

        My problem is, that the function (here processChunk) does not seem to be called, when one opened file (here variable file) is being handed over instead of the file name. ProcessChunk is only a loop function which iteratively callse processLine:

        def processChunk(chunk):
        print(“processChunk called”)
        result = 0
        for line in chunk:
        result = result + processLine(line)
        return result

        The following iterative case returns the correct result:

        for chunk_number in range(number_of_chunks):
        results.append(processChunk(file_block(file, number_of_chunks, chunk_number)))

        In the results array are then the results (number_of_chunks many). However, changing this to either

        results = [pool.apply_async(processChunk, args=(file_block(file, number_of_chunks, chunk_number),)) for chunk_number in range(number_of_chunks)]

        or the corresponding map_async case leads to neither processChunk nor file_block function to be called and the results object is empty. Therefore I thought that I have to open the file for each worker/core separately, which seemed to be the only solution that actually worked.

        To make this short, what approach did you choose in your code.

  5. Stefan

    Just to update: The async_map / async_apply method does not seem to work with your approach. The problem is, that the parameters “fp”, which stores the opened textfile, cannot be serialized (which it has to for the file_block function to be called in parallel) . Therefore, you have to pass the serializable file name of the text file, so that each worker the opens the file for himself.
    Nevertheless, thanks for your help.

    Reply

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s