Dwi Wahyudi

Senior Software Engineer (Ruby, Golang, Java)


There are times when we need to develop full-text search feature in our application. ElasticSearch is our first option most of the times. In this article we’re going to use Golang to communicate with the ElasticSearch essential features.

Overview

When we’re talking about ElasticSearch essential features, we may ask, what are those? Of course searching things, but ElasticSearch provides different kind of search, that is full-text search, searching from plenty of semi-structured data. Unlike SQL, ElasticSearch is considered as a no-SQL. Of course we can use SQL-like syntax: https://www.elastic.co/guide/en/elasticsearch/reference/current/sql-overview.html, but we normally don’t use query like SELECT * FROM WHERE GROUP BY in ElasticSearch.

ElasticSearch can be used to store and search logs, it can also do some analytics as well (The ELK stack). But in this article we’re going to build a feature to search products data using Golang.

We must use each tool accordingly to its capability and drawback. ElasticSearch can search fast because the way it is designed to be a searching and analytical tool, with its ability to withstand failure (with sharding, replica and clustering), it can serve full-text search so well. But for data consistency, core business data mostly stays in RDBMS.

ElasticSearch for example doesn’t have JOIN like SQL (for filtering or aggregating between multiple tables), it is quite hard to do it on the fly in ElasticSearch. We must denormalize the index (and reindex whole data). RDBMS like PostgreSQL and MySQL have constraints, transactions, locking, etc, etc, features that are suitable for keeping data consistent and race-condition free.

PostgreSQL also has full-text search capability, we’ll cover this in the future article.

Implementation in Golang

The common pattern of using ElasticSearch is keeping its data (document is ElasticSearch terminology) sync with the core database. Some frameworks like https://github.com/ankane/searchkick (for Ruby on Rails) give us complete solutions out of the box, for reindexing, searching, aggregating, etc. This framework involves some magic though, because it is plugged inside Rails ORM (ActiveRecord), any data creation, updating, deletion in database will be automagically synced to ElasticSearch behind the curtain, developers will only need to define the json data to be indexed in each model, 1 Ruby on Rails model, 1 core database table, 1 ElasticSearch index.

In Golang, there is no way we can do such thing. We need to do the whole things manually and explicitly. ElasticSearch provides a Golang client: https://github.com/elastic/go-elasticsearch. But in this article we’re going to use restful call to ElasticSearch server.

Let’s start with installing ElasticSearch:

docker run -d --name elasticsearch -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.16.2

Let’s create repository-level code:

type (
	ElasticSearchService struct {
		baseURL    string
		esAuth     string
		env        string
		httpClient http.Client
	}
  // ...
)

func NewProductSearch(baseURL string, esAuth string, env string, httpClient http.Client) *ElasticSearchService {
	return &ElasticSearchService{
		baseURL:    baseURL,
		esAuth:     esAuth,
		env:        env,
		httpClient: httpClient,
	}
}

Bulk Upsert Operation

This operation involves sending data to ElasticSearch. Upserting means that if existing data (document) exist, we’ll just update it. We’ll call POST _bulk API provided by ElasticSearch.

https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html

Let say this is how our product data is structured:

type (
	ProductBulkInsertRequest struct {
		ProductID       int64
		ProductName     string
		BasePrice       float64
		Price           float64
		MerchantID      int64
		MerchantName    string
		IsActiveProduct bool
	}
  //...
)

The signature for the method will be like this:

ProductUpsertDocBulk(ctx context.Context, param []ProductBulkInsertRequest) error

We’ll need to decide on the ElasticSearch index name first:

const(
  //...
  SHOP_PRODUCT_INDEX = "shop_products_%s"
  //...
)

We’re going to use Golang text templating library, and here’s the request body template

	PRODUCT_UPSERT_BULK_TEMPLATE = `{"index":{"_index":"{{.IndexName}}","_id":{{.ID}}}}
		{"product_name":"{{.ProductName}}","base_price":{{.BasePrice}},"price":{{.Price}},"merchant_id":{{.MerchantID}},"merchant_name":"{{.MerchantName}}","is_active_product":{{.IsActiveProduct}}}
	`

Let’s process this template, assuming that ess is an instance of ElasticSearchService:

	indexNameWithEnv := fmt.Sprintf(SHOP_PRODUCT_INDEX, ess.env)
	tmp := template.New("product_upsert")
	tmp, err := tmp.Parse(PRODUCT_UPSERT_BULK_TEMPLATE)
	if err != nil {
		logger.Error().Err(err).Msg("unable to parse request body text template")
		return nil
	}

Then let’s create the request body from the template, assuming param is instance of []ProductBulkInsertRequest:

  bodyBytes := make([]byte, 0)

  for _, eachBulkInsertRequest := range param {
    var bytesBuffer bytes.Buffer

    err = tmp.Execute(&bytesBuffer, BulkUpsertToTemplate{
      IndexName:       indexNameWithEnv,
      ID:              eachBulkInsertRequest.ProductID,
      ProductName:     eachBulkInsertRequest.ProductName,
      BasePrice:       eachBulkInsertRequest.BasePrice,
      Price:           eachBulkInsertRequest.Price,
      MerchantID:      eachBulkInsertRequest.MerchantID,
      MerchantName:    eachBulkInsertRequest.MerchantName,
      IsActiveProduct: eachBulkInsertRequest.IsActiveProduct,
    })
    if err != nil {
      logger.Error().Err(err).Msg("unable to execute request body text template")
      return nil
    }

    bodyBytes = append(bodyBytes, bytesBuffer.Bytes()...)
  }

bodyBytes (instance of []byte) here is the request body ready to be sent to the _bulk POST API (fmt.Sprintf("%s/_bulk", ess.baseURL)).

_bulk API request body doesn’t conform with standard JSON. We can use it to bulk create, update and delete in 1 request. We can also create, update, get and delete for 1 document only with respective API:

Search Operation

This operation involves structuring query syntax as request body to be sent to ElasticSearch POST _search.

https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html

We want to search the indexed document by product and merchant name, including the pagination.

	ProductSearchRequest struct {
		ProductNameAndMerchantQuery string
		From                        int
		Size                        int
	}

The result will be the same data we send to ElasticSearch.

  ProductSearchResult struct {
    From     int
    Size     int
    Products []ProductSearchResultDetail `json:"products"`
  }

  ProductSearchResultDetail struct {
    ProductID    int64   `json:"product_id"`
    ProductName  string  `json:"product_name"`
    BasePrice    float64 `json:"base_price"`
    Price        float64 `json:"price"`
    MerchantID   int64   `json:"merchant_id"`
    MerchantName string  `json:"merchant_name"`
  }

The signature of our search method will be like this:

ProductSearch(ctx context.Context, param ProductSearchRequest) (result ProductSearchResult, err error)

Here’s the request body we’ll send to ElasticSearch:

	PRODUCT_SEARCH_TEMPLATE = `{
	"from": {{.From}},
	"size": {{.Size}},
	"query": {
		"function_score": {
			"query": {
				"bool": {
					"must": {
						"match": {
							"product_name": "{{.Query}}"
						}
					},
					"filter": {
						"term": {
							"is_active_product": true
						}
					}
				}
			},
			"functions": [
				{
					"filter": {
						"match": {
							"merchant_name": "{{.Query}}"
						}
					},
					"weight": 10
				}
			]
		}
	}
}`

ElasticSearch will return scores alongside the matched indexed document. Document will be sorted by score by default. More score means that such document is more relevant to the search query.

This query means that return all products with product_name as queried, find products only by product_name only, nothing else, but if there is a product which has matched merchant_name, prioritize it.

We use function_score here to modify the scores returned by ElasticSearch. https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-function-score-query.html

Let say that user queries: “burger”, the query will certainly return products that has the name “burger”, the more relevant, the higher its position will be. But…, if the merchant_name has “burger” in it, it will be prioritized. If the merchant has name “burger” in it, but no burger product, none will be returned from this merchant.

Boost and weight are different, boost are applied to values, the example of this would be like we are sponsored by Joe’s Pizza, we want to boost merchant Joe’s Pizza if user query “pizza”, “italian food”, etc, etc.

We also can’t forget the pagination param above:

	"from": {{.From}},
	"size": {{.Size}},

We’ll also query is_active_product: true only. This can be updated either with _bulk API or update API (https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html).

Let’s process the search query template:

	q := param.ProductNameAndMerchantQuery

	productDetails := make([]ProductSearchResultDetail, 0)
	indexNameWithEnv := fmt.Sprintf(SHOP_PRODUCT_INDEX, ess.env)

	tmp := template.New("product_search")
	tmp, err = tmp.Parse(PRODUCT_SEARCH_TEMPLATE)
	if err != nil {
		logger.Error().Err(err).Msg("unable to parse request body text template")
		return
	}

	var bytesBuffer bytes.Buffer
	err = tmp.Execute(&bytesBuffer, ProductQueryToTemplate{Query: q, From: param.From, Size: param.Size})
	if err != nil {
		logger.Error().Err(err).Msg("unable to execute request body text template")
		return
	}

bytesBuffer here is the request body we intent to send to _search query (fmt.Sprintf("%s/%s/_search", ess.baseURL, indexNameWithEnv)).

Here’s how ElasticSearch response will look like in Golang struct:

	ESPResponse struct {
		Took     int  `json:"took"`
		TimedOut bool `json:"timed_out"`
		Shards   struct {
			Total      int `json:"total"`
			Successful int `json:"successful"`
			Skipped    int `json:"skipped"`
			Failed     int `json:"failed"`
		} `json:"_shards"`
		Hits struct {
			Total struct {
				Value    int    `json:"value"`
				Relation string `json:"relation"`
			} `json:"total"`
			MaxScore float64 `json:"max_score"`
			Hits     []struct {
				Index  string  `json:"_index"`
				Type   string  `json:"_type"`
				ID     string  `json:"_id"`
				Score  float64 `json:"_score"`
				Source struct {
					ProductName  string  `json:"product_name"`
					BasePrice    float64 `json:"base_price"`
					Price        float64 `json:"price"`
					MerchantID   int64   `json:"merchant_id"`
					MerchantName string  `json:"merchant_name"`
				} `json:"_source"`
			} `json:"hits"`
		} `json:"hits"`
	}

We can analyze many things here, but our focus here is the Source struct. We can then easily transform them like this:

	var espResp ESPResponse
	err = json.Unmarshal([]byte(responseBody), &espResp)
	if err != nil {
		logger.Error().Err(err).Str("response_body", string(responseBody)).Msg("unable to unmarshall response body")
		return
	}

	for _, eachHit := range espResp.Hits.Hits {
		source := eachHit.Source

		productID, err := strconv.ParseInt(eachHit.ID, 10, 64)
		if err != nil {
			logger.Error().Err(err).Str("response_body", string(responseBody)).Msg("unable to unmarshall response body")
			return result, err
		}

		eachProductSearchResult := ProductSearchResultDetail{
			ProductID:    productID,
			ProductName:  source.ProductName,
			BasePrice:    source.BasePrice,
			Price:        source.Price,
			MerchantID:   source.MerchantID,
			MerchantName: source.MerchantName,
		}
		productDetails = append(productDetails, eachProductSearchResult)
	}

productDetails is ready to be returned from the method.

Autocomplete Operation

This autocomplete involves structuring query syntax as request body to be sent to ElasticSearch POST _search.

https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html

Almost similar to the search operation above, except that the goal for this operation is to give suggestions to user. For example, we have these products documents: “Chocolate Premium Ultra Cake”, “Chocolate Milk”, “Espresso Chocolata”, “Pizza Choco Small”.

When user type “choc” (as query), they will be suggested with all of those 4 entries.

Here’s the request struct:

	AutoCompleteRequest struct {
		Query string
		From  int
		Size  int
	}

The signature of our search method will be like this:

AutoComplete(ctx context.Context, param AutoCompleteRequest) (result ProductSearchResult, err error)

We create upsert method above using dynamic mapping in ElasticSearch, meaning that we don’t need to explicitly define mapping for the index to be queried.

But in order to create an autocomplete feature, we must map the product_name field to use search_as_you_type type. There are actually many types of suggesters available in ElasticSearch (https://www.elastic.co/guide/en/elasticsearch/reference/current/search-suggesters.html), but we’re going to use search_as_you_type (https://www.elastic.co/guide/en/elasticsearch/reference/current/search-as-you-type.html).

We’ll need to update the mapping first by calling update API to the index, we can do this with curl, Postman or programmatically with any programming language:

PUT /shop_products_prod

{
  "mappings": {
    "properties": {
      "product_name": {
        "type": "search_as_you_type"
      }
    }
  }
}

ElasticSearch will then index product_name fields to be optimized for autocompletion feature. For document with product name: Chocolate Premium Ultra Cake, it will create some new fields:

  • shop_products_prod, the default analyzer.
  • shop_products_prod._2gram: split the text up by 2 words/shingles (n-grams), it will be like this: “Chocolate”, “Chocolate Premium”, “Premium”, “Premium Ultra”, etc.
  • shop_products_prod._3gram: split the text up by 2 words/shingles (n-grams), it will be like this: “Chocolate”, “Chocolate Premium”, “Chocolate Premium Ultra”, etc.
  • shop_products_prod._index_prefix: basically split each word into substring, it will be like this: “C”, “Ch”, “Cho”, etc

Here’s the request body template:

	PRODUCT_AUTOCOMPLETE_TEMPLATE = `{
    "from": {{.From}},
    "size": {{.Size}},
    "query": {
      "bool":{
        "must":{
          "multi_match": {
            "query": "{{.Query}}",
            "type": "bool_prefix",
            "fields": [
              "product_name",
              "product_name._2gram",
              "product_name._3gram"
            ]
          }
        },
        "filter":{
          "term":{
            "is_active_product":true
          }
        }
      }
    }
  }`

Process the template:

	tmp := template.New("product_autocomplete")
	tmp, err = tmp.Parse(PRODUCT_AUTOCOMPLETE_TEMPLATE)
	if err != nil {
		logger.Error().Err(err).Msg("unable to parse request body text template")
		return
	}
	var bytesBuffer bytes.Buffer
	err = tmp.Execute(&bytesBuffer, AutoCompleteRequest{Query: param.Query, From: param.From, Size: param.Size})
	if err != nil {
		logger.Error().Err(err).Msg("unable to execute request body text template")
		return
	}

bytesBuffer here is the request body we intent to send to _search query (fmt.Sprintf("%s/%s/_search", ess.baseURL, indexNameWithEnv)).

We will expect that ElasticSearch will response with ESPResponse above and then we can process it in Golang and return the results to user.