Dwi Wahyudi

Senior Software Engineer (Ruby, Golang, Java)


In this article, we’re going to look at one of the most under-utilized design-pattern in software engineering, the observer pattern.

Overview

Observer pattern is one of behavioral design-patterns. It lets us to define a simple event subscription mechanism. With a simple setup, we can make a collection of objects/services (mostly inside the same instance/application) to listen to an event and take actions accordingly (when that event happens).

https://en.wikipedia.org/wiki/Observer_pattern

This pattern will be useful when there’s an event, and we want multiple actions to follow-up without changing/updating the event code itself. This will make such mechanism extensible. For example, let say there’s an order complete event in our code, we want to do a lots of actions when such event happens, for example:

  • Send the invoice via email,
  • Send the order data to marketing tools,
  • Send push-notifications to user, admin and merchant/seller,
  • etc

If we’re not careful enough, we’ll just write all of those 3 (or more) actions inside the same method/function/class of order complete code no matter if using Goroutine or not (which is a sure way to reduce the codebase healthiness). We add more actions to followup, we’ll add more complexity to the order completion function.

One way to solve this is to use observer pattern. We make all those 3 (or more) actions (as subscribers) to subscribe/listen to that event. In other words we’re creating 3 or more subscribers/listeners. When such event happens, call a notify method, inside that notify method we’ll simply call each subscriber to do its action.

The observer pattern knows all of the observers before-hand, creating a coupling between the event and the subscribers, and because of both event and the subscribers run in the same instance of application, it might create scalability problem down the road (because both of them are handled in the same instance).

Most of developers will use pub-sub with message brokers like Kafka and RabbitMQ. Observer pattern is there if:

  • We’re sure that the instance can handle both event and subscribers. We’ll make sure that each subscriber is light and small enough to run.
  • We want to strictly specify the observers before-hand, message brokers are loosely coupled and each of the publisher and subscriber don’t know the existence of each other.
  • We don’t want to involve network call, eliminating the latency and security issues (related to network communications between the app/publisher, the message broker and the subscribers).

If any observer require heavy-lifting to do its action, we can create a separated service for it with the help of a message broker. The transition will be easy because the publishing to message broker can be handled using observer pattern too, the publishing to message broker can be one of observer pattern subscribers, order completion event notify the message broker as subscriber. This is mostly useful if we want the event code to be clean/concise while balancing between the budget and needs, the transition between the 2 can be smoothed out using observer pattern.

  • Not ready yet to roll-out a new service for subscribers? Use observer pattern to clean the core (order completion) code.
  • One subscriber is getting heavy and needs a new service? Deploy the new service, deploy the message broker, remove the service from the observer pattern subscribers, create a new observer pattern subscriber for publishing to the message broker.

We’ve already covered the message broker notably RabbitMQ.

Implementation in Golang

The Event Package

Let say, in an app, we want to create a transfer event (moving money from an account to another) in an application. There will be some actions as subscribers, let say:

  • Send the transfer detail email to user,
  • Update the users transfer rank,
  • Send the transfer data to marketing tools.

The implementation will be simple. First, just create a new package: event, this package should contain event codes only. Create a new file balance_event.go this is specific for any event related to balance (amount of money). Inside the balance_event.go.

  • We’ll create the BalanceEvent interface and its TransferNotify method. This is the method that will be called by the event (transfer code).
    • There’s also balanceEvent as implementation, it contains transferObservers []TransferObserver, the observers/subsribers.
  • We’ll also create TransferObserver interface and its Update event, this method is for subscribers to implement.
  • When balanceEvent’s TransferNotify is called by the event, we’ll have each of balanceEvent’s transferObservers to call its Update method. This is a simple implementation, if we want to call each Update via goroutine, we may want to create another implementation, something like (b *asyncBalanceEvent) TransferNotify. We can further enhance this simple implementation by adding things like logging and tracing.
package event

import (
	"context"
	"sample-wallet/entity"
)

type (
	TransferObserver interface {
		Update(ctx context.Context, transferEvent entity.TransferEvent) error
	}

	BalanceEvent interface {
		TransferNotify(ctx context.Context, transferEvent entity.TransferEvent)
	}

	balanceEvent struct {
		transferObservers []TransferObserver
	}
)

func (b *balanceEvent) TransferNotify(
	ctx context.Context, transferEvent entity.TransferEvent,
) {
	for _, eachObserver := range b.transferObservers {
		eachObserver.Update(ctx, transferEvent)
	}
}

Here’s the inject.go of event package, we register all of the needed subscribers/observers:

package event

import (
	"sample-wallet/api/marketingtools"
	"sample-wallet/email/transferemail"
	"sample-wallet/repo/balancestatrepo"

	"sync"
)

var (
	BalanceEventOnce sync.Once
	balanceEventSvc  BalanceEvent
)

func InjectNewBalanceService() BalanceEvent {
	BalanceEventOnce.Do(func() {
		balanceStatRepo := balancestatrepo.InjectNewBalanceRepo()
		transferEmail := transferemail.InjectNewTransferEmail()
		marketingtools := marketingtools.InjectNewMarketingToolsApi()

		balanceEventSvc = &balanceEvent{
			transferObservers: []TransferObserver{
				balanceStatRepo, transferEmail, marketingtools
			},
		}
	})

	return balanceEventSvc
}

Later if we want to add a Kafka call (moving to pub-sub), we can just add something like kafkaApi to []TransferObserver. Again without changing the event code.

The Observers/Subscribers

Each of balanceStatRepo, transferEmail, marketingtools must be a struct that implements the TransferObserver interface above.

Here’s the example of balanceStatRepo implementation:

func (b *balanceStatMemory) Update(ctx context.Context, transferEvent entity.TransferEvent) error {
	err := b.UpdateOutboundTransferTotalByUsernameToRank(ctx, transferEvent)
	if err != nil {
		return err
	}

	err = b.UpdateTopTransferByUsernameToRank(ctx, transferEvent)
	if err != nil {
		return err
	}

	return nil
}

The Update method here must follow the Update signature in TransferObserver:

Update(ctx context.Context, transferEvent entity.TransferEvent) error

So we got the idea here: TransferNotify above contains transferEvent entity.TransferEvent argument that will be passed down to this observer. If we have other needs to modify it, just update the (b *balanceEvent) TransferNotify( above.

Btw, here’s the entity.TransferEvent sample:

	TransferEvent struct {
		SourceUsername      string
		DestinationUsername string
		Amount              float64
	}

Each event surely has different data to send to observers/subscribers. The idea here is to conform with the agreed contract of TransferObserver interface above so other developers can follow when creating other observers/subscribers.

The Event Trigger

And here’s the code on transfer. As we can see here, we don’t pollute any logic of subscribers here. No matter how many subscribers there are, this code won’t change. We can keep adding the subscribers by following the TransferNotify interface. Even adding publisher to Kafka or RabbitMQ (if done via observer/subscriber) won’t change this code. Hiding the Kafka/RabbitMQ implementation behind this BalanceEvent interface is to aim the open-closed principle in SOLID.

https://en.wikipedia.org/wiki/Open%E2%80%93closed_principle


// ...

	balanceDomainService struct {
		balanceDomainRepo balancedomainrepo.BalanceDomainRepo
		balanceEvent      event.BalanceEvent
	}

// ...

func (b *balanceDomainService) Transfer(ctx context.Context, sourceUsername string, destinationUsername string, amount float64) error {
	err := b.balanceDomainRepo.Transfer(ctx, sourceUsername, destinationUsername, amount)
	if err != nil {
		return err
	}

	b.balanceEvent.TransferNotify(ctx, entity.TransferEvent{
		SourceUsername:      sourceUsername,
		DestinationUsername: destinationUsername,
		Amount:              amount,
	})

	return nil
}

We inject the balanceEvent as usual in inject.go:

func InjectNewBalanceDomainService() BalanceDomainService {
	balanceDomainServiceOnce.Do(func() {
		balanceDomainRepo := balancedomainrepo.InjectNewBalanceDomainRepoUseMemory()
		balanceEvent := event.InjectNewBalanceService()
		balanceDomainSvc = &balanceDomainService{
			balanceDomainRepo: balanceDomainRepo,
			balanceEvent:      balanceEvent,
		}
	})

	return balanceDomainSvc
}

event.InjectNewBalanceService() here is the key for connecting the event with its subscribers/observers.