In this post, I will demonstrate loading my criminal activity data into ElasticSearch sot it can be explored, analyzed and visualized in Kibana.
For instructions on installing and configuring the Elastic (formerly ELK) Stack, see my previous post. Although this post will specially reference the crime data from my PostgreSQL database, I will include additional scripts in my GitHub repo for the project that you can reference later.
Loading Data Into ElasticSearch
Assuming you have installed and configured at least ElasticSearch and Kibana, the rest of the tutorial should work. I am also running ElasticSearch v7.10.
Starting Services
First thing we need to do is to start our PostgreSQL and Elastic Stack services. If you want to just run a sub-stack, you can just run ElasticSearch and Kibana services as shown below.
sudo systemctl start postgresql@13-main
sudo systemctl start elasticsearch.service
sudo systemctl start kibana.service
Once you have started the services, we can check out Kibana/ElasticSearch at http://localhost:5601/ in our favorite browser, if you have not modified the default settings.
Connecting to PostgreSQL
As demonstrated in previous posts, we use the helper function I created to establish a connection to the PostgreSQL database. For each need to connection, I can use pg_connect()
. Please refer to the function documentation for more information about my default configurations, as well as how to modify for your specific use case.
To connect to my ElasticSearch instance I use the package elastic
provided by rOpenSci. This package provides all the functionality I need to connect, create/delete/update an index and load data. I can also submit data mappings should I have any specific datatype I need to call out. In my case, I want to expose the location (latitude, longitude) of the criminal activity and ensure my datatime field format is accepted as the right datatype.
pg_connect()
es_connect <- elastic::connect()
Defining DataType Mappings
In the following JSON formatted string, I define the mapping properties of the location and datetime fields from my data. The location field is not currently present in my data. By looking at the documentation on ElasticSearch for the geo-point datatype, I know how to create the necessary field. Just to be sure my datetime field is expressed as a date-time field, I went ahead and defined the format used and declare that as a date datetype as well. Reference the date datatype documentation for more details.
index_body <- '{
"mappings" : {
"properties" : {
"location" : { "type" : "geo_point"},
"datetime" : {
"type" : "date",
"format" : "yyyy-MM-dd HH:mm:ss"
}
}
}
}'
Create Index
Next, using the mapping properties, we can define the desired index name and create the destination index that the crime data will populate. In the code below I provide the method for how you would delete an index if it already exists and you want to start over. The method for creating the index at the desired ElasticSearch connection follows. After running the command, you will get an acknowledgement response. If successful, you can continue on.
index_name <- 'crime_data'
# Delete index if it exists or you want to start over.
# index_delete(conn = es_connect,
# index = index_name)
# Create index at the given connection for the given
# index name and specified data mappings.
index_create(conn = es_connect,
index = index_name,
body = index_body)
Sub-setting Data
Now that I have created my destination index for the crime data, I can begin to reduce the volume of data I am loading into R and ElasticSearch at any one point in time, I’ll query my data by year from the “START_DATE” field. If you recall in my data exploration post, I created a “crime_start_date” table where I transformed the original string date field into a real date/time datatype.
Subset Columns
This next portion will get the column headers from the “crime” table. I will filter them to exclude certain columns that I don’t think provide much use for my project. I am also dropping the “START_DATE” field since I will get the real datetime field from another table.
After I subset my columns, I build the columns to conform to my SELECT statement that will be used later. This involves collapsing the vector into a single string. Since my column headers are uppercase, I need to put quotes around them (PostgreSQL thing).
# Get all the field names for the table
table_col_names <- dbListFields(conn = pg_connect(),
name = db_table)
# Determine which fields not are necessary for the ElasticSearch Database.
# Modify as you wish.
columns_to_exclude <- c('XBLOCK', 'YBLOCK', 'BLOCK',
'BLOCK_GROUP', 'START_DATE')
# Get the subset of column names
table_col_names_subset <-
table_col_names[!(table_col_names %in% columns_to_exclude)]
# Combine the vector names into a string, formatted for the database query.
query_select <-
str_c(table_col_names_subset, collapse = '", "')
Subset Rows
Wrapped in a for-loop, I created an ETL process to query my data by year from my database (extract), and perform a table modification (transform) before loading the data into ElasticSearch (load).
In the first block, you can see the SQL statement I am using to select my data from both “crime” and “crime_start_date” tables. I filter the “Crime_start_date” table on the year. This is an optimized query and reduces the data at the join level. I also include two fields from the “crime_start_date” table in my SELECT statement.
query_string <- paste0('SELECT \"', query_select, '\", ',
'df2.datetime, df2.date ',
'FROM ', db_table, ' ',
'INNER JOIN (',
'SELECT * FROM crime_start_date ',
'WHERE year = ', i,
') as df2 ',
'ON df2."START_DATE" = ',
db_table, '."START_DATE"',
';')
Now that the query string is constructed, I submit the query and get the results.
db_data_subset <-
RPostgres::dbGetQuery(conn = pg_connect(),
statement = query_string)
Before loading the data, I need to make sure I create my “location” field to conform to what I previously defined in my property mapping. This would be a great spot to perform any additional modifications to the data.
db_data_subset %<>%
mutate(location = paste0(LATITUDE,',', LONGITUDE))
Load Data Into ElasticSearch
Using the elastic
packages docs_bulk
function, I can specify the ElasticSearch connection, index destination and data. There are additional function arguments that can be passed as well. I chose to pass the es_ids = T
to allow ElasticSearch to create UUIDs for each record. This is not necessary since it is the default, but I wanted to expose for the purpose of this tutorial. I wrapped my function in the invisible
function to mute the output during the loading process. The function displays the progress during the loading process in the console output.
invisible(
docs_bulk(
conn = es_connect,
index = index_name,
x = db_data_subset,
es_ids = T # assigns ElasticSearch UUIDs (True by default)
)
)
Track Loading Progress
I did include a print statement to let me know which year was being processed and how many records are currently in the index after the loading of each subset has completed. Here is what the output looks like.
print(paste("finished uploading crime data from :", i))
count(conn = es_connect, index=index_name) %>%
paste("Total Records in Index:", .) %>%
print
When all the data is loaded and my for-loop completes I can see the entire completed progression output.
At this point I can transition to working in the Kibana web interface (http://localhost:5601/).
ElasticSearch and Kibana Interface
When you first open up the web interface you should see the one of the following. I previously created a space called Project Crime to separate this effort from other planned projects.
ElasticSearch Index Management
Before I created the index and loaded my data, I could see what indices already existed in ElasticSearch. Simply navigate to “Stack Management” and then “Index Management”
Once you create the index, you should be able to see it in the list of indices. When the loading start you can refresh the indices to view the document count and storage size of the data. If you have lots of indices, you can filter by typing in the index search field. In my case I would type in “crime”, which would bring up the only index, “crime_data”. In the image below you can see the index with its first year of data already loaded. There were 31214 records and it took up 9.1 MB.
After my R script completes the loading process, I can recheck my index to see that I have 404684 records at 129.4 MB. At this point my data is storage into the NoSQL database.
Create Index Pattern in Kibana
The next step is to create an index in Kibana. Navigate to the “Kibana” section in the left pane then select “Index Patterns”.
Now I need to create an index pattern.
In step 1 of 2, I define the index pattern. My particular instance is easy but I could have also created an index for each year and name them “crime_data_{year}”. For this instance I defined my index to “crime_data*” which will encompass all indices with the base pattern and wildcard the ending. We can visually see which indices come up as we enter the text as well.
In the next step we define configurations. In my case, I configured the time field to “datetime”.
After the Kibana index has been associated with the ElasticSearch index, we can inspect the fields and their datatypes, format, searchability and aggregatability.
After scrolling to the bottom, I can see my “datetime” and “location” fields are showing up as the correct datatypes that I defined in the mapping.
Explore In Kibana
Once my first round of data was loaded, I checked out the Discover area in Kibana. I first had to modify the date range since the records will align to the data/time from their start_date field. Once the time frame was wide enough I could see my uploaded data.
Once all the data has been uploaded, I can see the following from 2009-2020:
At this pint I can query my data and create data visualizations and dashboards. Kibana is a great exploratory data analysis tool. I’ll display this more in a separate post.
Quick look at the data in Maps
In this section, I will briefly look at the data using Kibana’s map feature.
I can add and create layers based on my data. In the layer configuration I can specify many details about my layer.
Just to showcase, I’ll create a layer for homicides in the city.
Homicide Layer
Using the query search feature In the top bar, I filter the data using the field and value. This is great for querying and visualizing the data during exploration, but lets say we want to create a layer instead. First we click on the “Add layer” button (right hand side).
First I’ll name the layer “homicides”. I can modify the “Tooltip fields” to display information about each point as you hover over or select a point on the map. In my case I just display the “OFFENSE” field value, which would display “HOMICIDE” when completed.
Next I will filter the data, just like I did in the top query search bar.
Now I will modify the aesthetics of the data on the map. I am only modifying the fill color, but you can see that you can modify the icon, size, and other aesthetic dimensions.
Now we can see the whole city with the layer. Keep in mind I did not filter by year so this is all events in my dataset.
We can even zoom in to get more granularity of the data. Below you can see the tool tip showing the 12 homicide event mentioned during the data exploration post.
What’s Next?
This concludes the loading of the data from PostgeSQL database into the NoSQL database, ElasticSearch, all from R.
We can continue to upload the rest of the data stored in PostgreSQL so we have a our data in both database types. This would allow for good integration of data exploration processes. I will post those scripts as I get around to them in the associated GitHub repo (below).
Posts in Project Series
- Criminal Analysis: Planning
- Criminal Analysis: Data Search (part 0)
- Criminal Analysis: Data Search (part 1)
- Criminal Analysis: Data Search (part 2)
- Criminal Analysis: Data Search (part 3)
- Criminal Analysis: Data Storage
- Criminal Analysis: Data Storage (part 2)
- Criminal Analysis: Data Search (part 4)
- Derive a Star Schema By Example
- Criminal Analysis: Data Exploration
- Criminal Analysis: Data Exploration (part 1)
- Criminal Analysis: Data Exploration (part 2a)
- Criminal Analysis: Data Exploration (part 2b)
- Criminal Analysis: Data Storage (Part 3)