NYC Taxi Journey


NYC Taxi Analysis Dashboard Github Repo

The NYC Taxi Trip Data dataset is one of the most famous datasets in the open domain, mostly due to its massive size. It’s put together by the NYC Open Data Initiative and NYC Taxi and Limousine Commission. The data is updated semi-annually and contains data for each month.

My main interest in this dataset was its massive size. It’s over 80 GBs in CSVs and over 1 billion rows! This was a milestone and threshold that I wanted to cross; process over a billion records into a useful analysis. I set out to accomplish:

  • 1) Process the CSVs and produce a usable output in under 1 hour with my current CPU.

  • 2) Visualizing it in Bokeh (specifically using an interactive map).

  • 3) Learn python’s big data processing ecosystem.

It’s always said that you should monitor and log your experiences, in hopes of others gaining some knowledge from you. In this write-up, I outline my process of navigating this dataset and identify some ways beginners could improve their code.

The second phase of the ETL transforms the data in similar ways as the first. The main purpose is to add extra features based on the datetime column of the pickup and dropoff times. Some things that I wanted to filter on in the visualization were month, hour of drop-off/pick-up, year, day of the week and whether it was a holiday or not. The main bottleneck of this part of the process was the parsing of the datetime columns (one for each of pick-up and drop-off) into usable datetime objects. Pandas implements this in a line-by-line fashion, which causes a problem when dealing with billions of records.

Originally, I implemented my solution very naively. I found a lot of trouble converting the datatype on read using pd.read_csv(parse_dates='date_column'), the dataset would often not be able to fit into memory. Since the datetime column was a string, I thought that using slices of the string, appending to lists and attaching those lists to the dataframe as columns would be enough. This ended up being extremely slow, as you’re iterating over the entire dataframe, parsing two strings, creating lists, then attaching the lists as columns. This approach caused memory to blow up (list creation in python is expensive) and took hours. I knew there must be a better way.

Next I used pd.to_datetime() with the format parameter set, which made a huge difference. Not having to infer the format every time was what made the difference. But I gave back all of the gains by iterating over every row of the dataframe again, parsing the features (month, hour, year, etc) from each datetime object. Still slow, and in fact, not even consistently faster.

Then, I realized that I should be using pandas built-in date vector operations. They made this part of the process really flow, decreasing processing time by more than 3x. This was mostly due to me writing custom functions for Series. The functions were the result of me not thinking deeply enough about a way I could solve it using the vectorized operations available to numpy arrays and the datetime functionality.

Finally, when working on the third part of the ETL, I realized that I still had too much data that I was working with. I wasn’t going to meet my goal of ~1 hour from raw csvs to data-usable-by-visualization app. I realized that a lot of the columns that I had were adding too much bloat and that I could pass on that processing to the visualization layer. This included columns that were for indicators like Season, a Weekend indicator, or Day of the Month (the latter which was unused in the visualization layer). Also, removing the datetime object columns proved to save the most space. The needed information was already parsed into their respective columns, so the larger, more costly columns weren’t needed.


The performance up to this point was woeful at best. I had gotten the medium sized data of the three (Green, Yellow, FHV) down to ~ 40 mins. All 3 in under an hour seemed undoable without utilizing more cores. One of the main reasons I wanted to try this analysis was to use Dask. It didn’t disappoint.

Dask trys to parallelize your python code. It does a great job of it. When testing with the Green dataset, it worked perfectly, easily parallelizing my function across all cores, and providing a helpful dashboard along with it. The API makes it really easy to test for the optimal combination of threads and processes.

Scaling up to the Yellow dataset brought many challenges. I had poor results, probably due to my misunderstanding of Dask. I often encountered MemoryErrors and it was difficult to find solutions without spending vast amounts of time running the process before it failed. Dask recommends not using ddf.compute() to return large dataframes (as this blows up memory), but that was my use case and I was unsure how to get what I wanted using aggregations. I decided to change course and use something else to parallelize.

I’ve used the threading and multiprocessing libraries in the past when I thought that Dask would be too heavy for a particular job, so I switched to multiprocessing (Dask taught me that threads weren’t the way to go). It’s fairly simple to build functions that execute on all cores and so I implemented this approach.

This approach was great as it allowed me to optimize how many rows were to be processed per batch for the best efficiency. This multiprocessing method only sends small amounts of data to each core, while Dask sends many tasks at once. The new approach lowered processing time by almost 20% and resulted in almost no MemoryErrors.

I was disappointed in not being able to find a solution to it via Dask, but happy I was able to use the right tool for the job. At this point, I was closing in on 1.5 hrs total processing time for just the first two parts (of 3 total) across each of the datasets.

Download

I sourced the data going back to July 2016 (when the format of the data became favourable for the analysis that I wanted to run). My internet connection is decently quick, but not fast enough for ad-hoc streaming of hundreds of gigs every time I wanted to run this analysis, so I downloaded the data from the TLC webpage. I wrote a script to download all of the months that I wanted and left it overnight since it can easily take hours. You can find the script here.

ETL Part 1

The download script wrote a bunch of CSVs into a directory to be worked on. In the PyData ecosystem, CSVs are common, but a performant common practice is to change to using either SQL tables or parquet files. When choosing between SQLite3 and the Pyarrow implementation of parquet, parquet seemed produce faster reads. I grouped the csv files based on type of vehicle (Green, Yellow, and FHV), consolidated column names (they changed across years and months), dropped rows with missing data and converted some of the numeric data types to their smaller, less memory-intensive forms. Each output was a parquet file with all of the type of vehicle’s trips across the entire time span. This step ultimately reduced the size of the data being worked with by about 80% or from 82 GB to 13.2 GB.


Vehicle Type / Datasets FHV Yellow Green
# of Rows in parquet file 601,330,268 371,889,042 35,185,411
Size of CSVs (GB) 47.5 31.6 2.96
Size of Parquet (GB) 8.92 3.92 0.367
CSV > Parquet Reduction (%) 81.3 87.5 87.6