Dwi Wahyudi
Senior Software Engineer (Ruby, Golang, Java)
There are times when we need to develop full-text search feature in our application. ElasticSearch is our first option most of the times. In this article we’re going to use Golang to communicate with the ElasticSearch essential features.
Overview
When we’re talking about ElasticSearch essential features, we may ask, what are those? Of course searching things, but ElasticSearch provides different kind of search, that is full-text search, searching from plenty of semi-structured data. Unlike SQL, ElasticSearch is considered as a no-SQL. Of course we can use SQL-like syntax: https://www.elastic.co/guide/en/elasticsearch/reference/current/sql-overview.html, but we normally don’t use query like SELECT * FROM WHERE GROUP BY
in ElasticSearch.
ElasticSearch can be used to store and search logs, it can also do some analytics as well (The ELK stack). But in this article we’re going to build a feature to search products data using Golang.
We must use each tool accordingly to its capability and drawback. ElasticSearch can search fast because the way it is designed to be a searching and analytical tool, with its ability to withstand failure (with sharding, replica and clustering), it can serve full-text search so well. But for data consistency, core business data mostly stays in RDBMS.
ElasticSearch for example doesn’t have JOIN like SQL (for filtering or aggregating between multiple tables), it is quite hard to do it on the fly in ElasticSearch. We must denormalize the index (and reindex whole data). RDBMS like PostgreSQL and MySQL have constraints, transactions, locking, etc, etc, features that are suitable for keeping data consistent and race-condition free.
PostgreSQL also has full-text search capability, we’ll cover this in the future article.
Implementation in Golang
The common pattern of using ElasticSearch is keeping its data (document is ElasticSearch terminology) sync with the core database. Some frameworks like https://github.com/ankane/searchkick (for Ruby on Rails) give us complete solutions out of the box, for reindexing, searching, aggregating, etc. This framework involves some magic though, because it is plugged inside Rails ORM (ActiveRecord), any data creation, updating, deletion in database will be automagically synced to ElasticSearch behind the curtain, developers will only need to define the json data to be indexed in each model, 1 Ruby on Rails model, 1 core database table, 1 ElasticSearch index.
In Golang, there is no way we can do such thing. We need to do the whole things manually and explicitly. ElasticSearch provides a Golang client: https://github.com/elastic/go-elasticsearch. But in this article we’re going to use restful call to ElasticSearch server.
Let’s start with installing ElasticSearch:
docker run -d --name elasticsearch -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.16.2
Let’s create repository-level code:
type (
ElasticSearchService struct {
baseURL string
esAuth string
env string
httpClient http.Client
}
// ...
)
func NewProductSearch(baseURL string, esAuth string, env string, httpClient http.Client) *ElasticSearchService {
return &ElasticSearchService{
baseURL: baseURL,
esAuth: esAuth,
env: env,
httpClient: httpClient,
}
}
baseUrl
is the base url of our ElasticSearch server which islocalhost:9200
.esAuth
is authentication key to communicate with the server. https://www.elastic.co/guide/en/elasticsearch/reference/current/http-clients.html..env
is needed to separate the index naming by environment (dev, staging, production, team1, team2, etc).
Bulk Upsert Operation
This operation involves sending data to ElasticSearch. Upserting means that if existing data (document) exist, we’ll just update it. We’ll call POST _bulk
API provided by ElasticSearch.
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
Let say this is how our product data is structured:
type (
ProductBulkInsertRequest struct {
ProductID int64
ProductName string
BasePrice float64
Price float64
MerchantID int64
MerchantName string
IsActiveProduct bool
}
//...
)
The signature for the method will be like this:
ProductUpsertDocBulk(ctx context.Context, param []ProductBulkInsertRequest) error
We’ll need to decide on the ElasticSearch index name first:
const(
//...
SHOP_PRODUCT_INDEX = "shop_products_%s"
//...
)
We’re going to use Golang text templating library, and here’s the request body template
PRODUCT_UPSERT_BULK_TEMPLATE = `{"index":{"_index":"{{.IndexName}}","_id":{{.ID}}}}
{"product_name":"{{.ProductName}}","base_price":{{.BasePrice}},"price":{{.Price}},"merchant_id":{{.MerchantID}},"merchant_name":"{{.MerchantName}}","is_active_product":{{.IsActiveProduct}}}
`
Let’s process this template, assuming that ess is an instance of ElasticSearchService
:
indexNameWithEnv := fmt.Sprintf(SHOP_PRODUCT_INDEX, ess.env)
tmp := template.New("product_upsert")
tmp, err := tmp.Parse(PRODUCT_UPSERT_BULK_TEMPLATE)
if err != nil {
logger.Error().Err(err).Msg("unable to parse request body text template")
return nil
}
Then let’s create the request body from the template, assuming param is instance of []ProductBulkInsertRequest
:
bodyBytes := make([]byte, 0)
for _, eachBulkInsertRequest := range param {
var bytesBuffer bytes.Buffer
err = tmp.Execute(&bytesBuffer, BulkUpsertToTemplate{
IndexName: indexNameWithEnv,
ID: eachBulkInsertRequest.ProductID,
ProductName: eachBulkInsertRequest.ProductName,
BasePrice: eachBulkInsertRequest.BasePrice,
Price: eachBulkInsertRequest.Price,
MerchantID: eachBulkInsertRequest.MerchantID,
MerchantName: eachBulkInsertRequest.MerchantName,
IsActiveProduct: eachBulkInsertRequest.IsActiveProduct,
})
if err != nil {
logger.Error().Err(err).Msg("unable to execute request body text template")
return nil
}
bodyBytes = append(bodyBytes, bytesBuffer.Bytes()...)
}
bodyBytes
(instance of []byte
) here is the request body ready to be sent to the _bulk
POST API (fmt.Sprintf("%s/_bulk", ess.baseURL)
).
_bulk
API request body doesn’t conform with standard JSON. We can use it to bulk create, update and delete in 1 request. We can also create, update, get and delete for 1 document only with respective API:
- Index API: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html
- Update API: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html
- Delete API: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete.html
- Get API: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.html
Search Operation
This operation involves structuring query syntax as request body to be sent to ElasticSearch POST _search
.
https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html
We want to search the indexed document by product and merchant name, including the pagination.
ProductSearchRequest struct {
ProductNameAndMerchantQuery string
From int
Size int
}
The result will be the same data we send to ElasticSearch.
ProductSearchResult struct {
From int
Size int
Products []ProductSearchResultDetail `json:"products"`
}
ProductSearchResultDetail struct {
ProductID int64 `json:"product_id"`
ProductName string `json:"product_name"`
BasePrice float64 `json:"base_price"`
Price float64 `json:"price"`
MerchantID int64 `json:"merchant_id"`
MerchantName string `json:"merchant_name"`
}
The signature of our search method will be like this:
ProductSearch(ctx context.Context, param ProductSearchRequest) (result ProductSearchResult, err error)
Here’s the request body we’ll send to ElasticSearch:
PRODUCT_SEARCH_TEMPLATE = `{
"from": {{.From}},
"size": {{.Size}},
"query": {
"function_score": {
"query": {
"bool": {
"must": {
"match": {
"product_name": "{{.Query}}"
}
},
"filter": {
"term": {
"is_active_product": true
}
}
}
},
"functions": [
{
"filter": {
"match": {
"merchant_name": "{{.Query}}"
}
},
"weight": 10
}
]
}
}
}`
ElasticSearch will return scores alongside the matched indexed document. Document will be sorted by score by default. More score means that such document is more relevant to the search query.
This query means that return all products with product_name
as queried, find products only by product_name
only, nothing else, but if there is a product which has matched merchant_name
, prioritize it.
We use function_score
here to modify the scores returned by ElasticSearch. https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-function-score-query.html
Let say that user queries: “burger”, the query will certainly return products that has the name “burger”, the more relevant, the higher its position will be. But…, if the merchant_name
has “burger” in it, it will be prioritized. If the merchant has name “burger” in it, but no burger product, none will be returned from this merchant.
Boost and weight are different, boost are applied to values, the example of this would be like we are sponsored by Joe’s Pizza, we want to boost merchant Joe’s Pizza if user query “pizza”, “italian food”, etc, etc.
We also can’t forget the pagination param above:
"from": {{.From}},
"size": {{.Size}},
We’ll also query is_active_product: true
only. This can be updated either with _bulk
API or update API (https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html).
Let’s process the search query template:
q := param.ProductNameAndMerchantQuery
productDetails := make([]ProductSearchResultDetail, 0)
indexNameWithEnv := fmt.Sprintf(SHOP_PRODUCT_INDEX, ess.env)
tmp := template.New("product_search")
tmp, err = tmp.Parse(PRODUCT_SEARCH_TEMPLATE)
if err != nil {
logger.Error().Err(err).Msg("unable to parse request body text template")
return
}
var bytesBuffer bytes.Buffer
err = tmp.Execute(&bytesBuffer, ProductQueryToTemplate{Query: q, From: param.From, Size: param.Size})
if err != nil {
logger.Error().Err(err).Msg("unable to execute request body text template")
return
}
bytesBuffer
here is the request body we intent to send to _search
query (fmt.Sprintf("%s/%s/_search", ess.baseURL, indexNameWithEnv)
).
Here’s how ElasticSearch response will look like in Golang struct:
ESPResponse struct {
Took int `json:"took"`
TimedOut bool `json:"timed_out"`
Shards struct {
Total int `json:"total"`
Successful int `json:"successful"`
Skipped int `json:"skipped"`
Failed int `json:"failed"`
} `json:"_shards"`
Hits struct {
Total struct {
Value int `json:"value"`
Relation string `json:"relation"`
} `json:"total"`
MaxScore float64 `json:"max_score"`
Hits []struct {
Index string `json:"_index"`
Type string `json:"_type"`
ID string `json:"_id"`
Score float64 `json:"_score"`
Source struct {
ProductName string `json:"product_name"`
BasePrice float64 `json:"base_price"`
Price float64 `json:"price"`
MerchantID int64 `json:"merchant_id"`
MerchantName string `json:"merchant_name"`
} `json:"_source"`
} `json:"hits"`
} `json:"hits"`
}
We can analyze many things here, but our focus here is the Source
struct. We can then easily transform them like this:
var espResp ESPResponse
err = json.Unmarshal([]byte(responseBody), &espResp)
if err != nil {
logger.Error().Err(err).Str("response_body", string(responseBody)).Msg("unable to unmarshall response body")
return
}
for _, eachHit := range espResp.Hits.Hits {
source := eachHit.Source
productID, err := strconv.ParseInt(eachHit.ID, 10, 64)
if err != nil {
logger.Error().Err(err).Str("response_body", string(responseBody)).Msg("unable to unmarshall response body")
return result, err
}
eachProductSearchResult := ProductSearchResultDetail{
ProductID: productID,
ProductName: source.ProductName,
BasePrice: source.BasePrice,
Price: source.Price,
MerchantID: source.MerchantID,
MerchantName: source.MerchantName,
}
productDetails = append(productDetails, eachProductSearchResult)
}
productDetails
is ready to be returned from the method.
Autocomplete Operation
This autocomplete involves structuring query syntax as request body to be sent to ElasticSearch POST _search
.
https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html
Almost similar to the search operation above, except that the goal for this operation is to give suggestions to user. For example, we have these products documents: “Chocolate Premium Ultra Cake”, “Chocolate Milk”, “Espresso Chocolata”, “Pizza Choco Small”.
When user type “choc” (as query), they will be suggested with all of those 4 entries.
Here’s the request struct:
AutoCompleteRequest struct {
Query string
From int
Size int
}
The signature of our search method will be like this:
AutoComplete(ctx context.Context, param AutoCompleteRequest) (result ProductSearchResult, err error)
We create upsert method above using dynamic mapping in ElasticSearch, meaning that we don’t need to explicitly define mapping for the index to be queried.
But in order to create an autocomplete feature, we must map the product_name
field to use search_as_you_type
type. There are actually many types of suggesters available in ElasticSearch (https://www.elastic.co/guide/en/elasticsearch/reference/current/search-suggesters.html), but we’re going to use search_as_you_type
(https://www.elastic.co/guide/en/elasticsearch/reference/current/search-as-you-type.html).
We’ll need to update the mapping first by calling update API to the index, we can do this with curl, Postman or programmatically with any programming language:
PUT /shop_products_prod
{
"mappings": {
"properties": {
"product_name": {
"type": "search_as_you_type"
}
}
}
}
ElasticSearch will then index product_name
fields to be optimized for autocompletion feature. For document with product name: Chocolate Premium Ultra Cake
, it will create some new fields:
shop_products_prod
, the default analyzer.shop_products_prod._2gram
: split the text up by 2 words/shingles (n-grams), it will be like this: “Chocolate”, “Chocolate Premium”, “Premium”, “Premium Ultra”, etc.shop_products_prod._3gram
: split the text up by 2 words/shingles (n-grams), it will be like this: “Chocolate”, “Chocolate Premium”, “Chocolate Premium Ultra”, etc.shop_products_prod._index_prefix
: basically split each word into substring, it will be like this: “C”, “Ch”, “Cho”, etc
Here’s the request body template:
PRODUCT_AUTOCOMPLETE_TEMPLATE = `{
"from": {{.From}},
"size": {{.Size}},
"query": {
"bool":{
"must":{
"multi_match": {
"query": "{{.Query}}",
"type": "bool_prefix",
"fields": [
"product_name",
"product_name._2gram",
"product_name._3gram"
]
}
},
"filter":{
"term":{
"is_active_product":true
}
}
}
}
}`
Process the template:
tmp := template.New("product_autocomplete")
tmp, err = tmp.Parse(PRODUCT_AUTOCOMPLETE_TEMPLATE)
if err != nil {
logger.Error().Err(err).Msg("unable to parse request body text template")
return
}
var bytesBuffer bytes.Buffer
err = tmp.Execute(&bytesBuffer, AutoCompleteRequest{Query: param.Query, From: param.From, Size: param.Size})
if err != nil {
logger.Error().Err(err).Msg("unable to execute request body text template")
return
}
bytesBuffer
here is the request body we intent to send to _search
query (fmt.Sprintf("%s/%s/_search", ess.baseURL, indexNameWithEnv)
).
We will expect that ElasticSearch will response with ESPResponse
above and then we can process it in Golang and return the results to user.