In this post, I will cover getting open source COVID-19 data for the United States using Python. The data pipeline demonstrated here is very simple example and could easily be adapted into a Prefect, Apache NiFi or Apache AirFlow ETL process.
Data Search
Performing a quick search on DuckDuckGo I got The COVID Tracking Project, among other sources I will explore later.
Other sources:
- https://ourworldindata.org/coronavirus-source-data
- https://datascience.nih.gov/covid-19-open-access-resources
- https://github.com/nytimes/covid-19-data
- https://datausa.io/coronavirus
I also would like to get population information for each state so I can assess COVID data for each state with respect to their populations. Another search got me the World Population Review website, which contains population information for multiple international and domestic dimensions in a downloadable format (CSV and JSON).
Data API and Download
Python Packages Used
For this post, I will use the following Python packages in my script:
import urllib.request
import pandas as pd
COVID API
After reading through website about the API, I constructed the following in Pythong to pull the data. Since I wanted to get all daily state data, I opt’d for the all states historical daily values. Using their API you can also get the US as a whole and state specific queries. You can get the data in either CSV or JSON format. The page also provides information about each of the fields and whether a field has be deprecated.
base_url = 'https://api.covidtracking.com'
all_states = '/v1/states/daily.csv'
# state_specific_api = '/v1/states/{state}/daily.csv'
If I wanted to get current daily data for all states to best suit project needs, I can use the following as well :
current_states = '/v1/states/current.csv'
current_state = '/v1/states/{state}/current.csv'
Finally, I download the data. Modify the URL argument to tailor to your needs, as well as the destination filename and location.
base_dir = '/home/linux/ProblemXSolutions.com/DataProjects/covid19'
filename = '/data/all_states_daily_covid_py.csv'
urllib.request.urlretrieve(url=base_url + all_states,
filename=base_dir + filename)
Population Data
Next, I’m getting the population data by state. This did not list an API, so I just downloaded the file to the same location as the COVID data. When attempting to get the CSV with Pythons’s urllib.request.urlretrieve
function, I encountered an error, so I just did it manually.
State Look Up Table
After looking at the data for each dataset, I decided needed a simple lookup table to translate state names to their abbreviated form. Since I already had this on hand, it was easy to just re-use.
Reading in the Data
Now that I have all my data, its time to read in each file.
state_data = pd.read_csv(base_dir + filename, low_memory=False)
state_pop_data = pd.read_csv(base_dir + '/data/state_populations.csv')
state_name_lookup_data = pd.read_csv(base_dir + '/data/state_lu.csv')
Initial Processing
State Population Data Processing
After exploring the state population data, I decided to reduce the dataset to what I need using the following code:
state_pop_reduced = state_pop_data.filter(items=['State', 'Pop', 'density'])
del state_pop_data
# had to insert this since python merge or read_csv doesn't work the same as in R versions
state_pop_reduced.loc[:, 'State'] = state_pop_reduced.loc[:, 'State'].str.strip()
state_name_lookup_data.loc[:, 'state'] = state_name_lookup_data.loc[:, 'state'].str.strip()
state_pop_reduced = pd.merge(left=state_pop_reduced, right=pd.DataFrame(state_name_lookup_data),
how='left',
left_on='State',
right_on='state')
state_pop_reduced.drop(columns=['State', 'state'],
inplace=True)
state_pop_reduced.rename(columns={'state_abr':'state'},
inplace=True)
If you read the R version of this post or seen each of the scripts this is a little messier, but the results are the same.
State COVID Data Processing
After exploring the COVID19 data, I decided to remove some of the original fields and calculate some fields. First, I perform a join on my population data by state.
state_data.loc[:, 'state'] = state_data.loc[:, 'state'].str.strip()
state_pop_reduced.loc[:, 'state'] = state_pop_reduced.loc[:, 'state'].str.strip()
state_data_enhanced = pd.merge(left=state_data,
right=state_pop_reduced,
on = 'state',
how='left')
del state_data
Then I transform the date field, which is in a “yyyymmdd” format that was read to be a numeric datatype. A simple fix is to parse the field using pd.to_datetime
to convert it to a date format.
state_data_enhanced['date'] = pd.to_datetime(state_data_enhanced['date'], format='%Y%m%d')
Data Enhancement Part 1
Next I sort the data on state and date before performing the series of transformations to get new calculated fields. Using the assign
function, I created the “daily_recovered” field by grouping the data by state and running the function diff
on the original field “recovered”. This matches up with the dataset’s existing fields annotating daily changes rather than just accumulated recovery numbers.
state_data_enhanced = \
state_data_enhanced \
.sort_values(by=['state', 'date']) \
.assign(
daily_recover = state_data_enhanced.groupby('state')['recovered'].transform('diff')
) \
.assign(
daily_cases_adj = lambda x: (x['positiveIncrease'] / x['Pop']) * 100000,
daily_recover_adj = lambda x: (x['daily_recover'] / x['Pop']) * 100000,
daily_deaths_adj = lambda x: (x['deathIncrease'] / x['Pop']) * 100000
)
Next, I perform another set of transformations to calculate the adjusted numbers (cases, recoveries and deaths) based on state specific populations. The idea behind this is to get some normalization between states, since population sizes can vary quite a bit by state. When looking at data this can tell two different stories and can be subjective to what is being conveyed and interpreted from the results. What is important? Actual volume or per capita volume? To calculate per capita numbers I used the following formula:
(daily_{cases,recover,deaths} / population of state) * 100,000
Then I roll into another assign
function to calculate the rolling averages of daily cases, recoveries, and deaths. This helps to smooth out the data from the data to numbers and is typically what you see in various charts in the media or other graphics on the internet. The reason for breaking up the assign
function is two-fold. It groups together like transformation operations, in this case daily numbers vs rolling averages. Another reason is if I need to compute on a field just created in the assign
function, this needs to be done in a separate function, otherwise the transformation will not see the new field in the same assign
function.
Data Enhancement Part 2
I broke up the original R piping to two separate set of operations. In the first set I created fields that will be used in the second set. The purpose is similar in Python. Grouping like operations together also helps to read and debug the code.
In the final round of transformations, I perform another assign
function on the previous calculations to get the rolling 7 day average based on actual volume and per capita numbers.
state_data_enhanced = \
state_data_enhanced \
.assign(active_roll7 = state_data_enhanced.groupby('state')['positiveIncrease']
.transform(lambda x: x.rolling(window=7, min_periods=1).mean()),
recovered_roll7 = state_data_enhanced.groupby('state')['daily_recover_adj']
.transform(lambda x: x.rolling(window=7, min_periods=1).mean()),
deaths_roll7 = state_data_enhanced.groupby('state')['deathIncrease']
.transform(lambda x: x.rolling(window=7, min_periods=1).mean())
) \
.assign(active_roll7_adj = state_data_enhanced.groupby('state')['daily_cases_adj']
.transform(lambda x: x.rolling(window=7, min_periods=1).mean()),
recovered_roll7_adj = state_data_enhanced.groupby('state')['daily_recover_adj']
.transform(lambda x: x.rolling(window=7, min_periods=1).mean()),
deaths_roll7_adj = state_data_enhanced.groupby('state')['daily_deaths_adj']
.transform(lambda x: x.rolling(window=7, min_periods=1).mean())
)
Data Reduction
With all the fields in the data, I decided to reduce the fields to what I’m interested. I always have access to the original so I not concerned I need to to review this list and reprocess again.
state_enhanced_reduced = state_data_enhanced.filter(
items=[
'date', 'state',
'positive', 'recovered', 'death',
'positiveIncrease', 'daily_cases_adj',
'active_roll7', 'active_roll7_adj',
'daily_recover', 'daily_recover_adj',
'recovered_roll7', 'recovered_roll7_adj',
'deathIncrease', 'daily_deaths_adj',
'deaths_roll7', 'deaths_roll7_adj',
'hospitalizedIncrease', 'hospitalizedCurrently',
'inIcuCurrently'
]
)
Write Out the Processed Data
Now that I have my initially processed data, I will write out the results for ease of use during follow-on work. You will notice I have changed the file endings to “_py” to differentiate between output files in other languages. This helps to verify and validate the outputs against each other.
state_data_enhanced.to_csv(base_dir + '/data/state_data_enhanced_py.csv')
state_enhanced_reduced.to_csv(base_dir + '/data/state_enhanced_reduced_py.csv')
GitHub Link
- https://github.com/problemxsolutions/covid19/blob/master/scripts/r/covid_data_processing.R
- https://github.com/problemxsolutions/covid19/blob/master/scripts/python/covid_data_processing.py
- https://github.com/problemxsolutions/covid19/blob/master/scripts/julia/covid_data_processing.jl