Why Event Sourcing Is Hard?

Mateusz Kubuszok

About me

Agenda

  • What is Event Sourcing?

  • Pitfalls of processing events

  • Pitfalls of creating events

  • Other pitfalls

  • Summary

What is Event Sourcing?

case class User(id: String, name: String)
// called directly by UI
def changeUsername(userID:  String, newName: String): Unit =
  val user = getUser(userID)
  val updatedUser = user.copy(name = newName)
  saveUser(user)
// fetch from DB
def getUser(userID: String): User

// saves to DB
def saveUser(user: User): Unit
trait UserActionListener:
  def changeUsername(userID: String, newName: String)
// added to UI
val userActionListeners: List[UserActionListener]

// still in UI
def changeUsername(userID: String, newName: String): Unit =
  userActionListeners.foreach { listener =>
    listener.changeUsername(userID, newName)
  }
enum UserAction:
  case ChangeUsername(userID: String, newName: String)

trait UserActionEventBus:
  def publish(action: UserAction): Unit
  def subscribe(f: UserAction => Unit): Unit
// in UI
def changeUsername(userID: String, newName: String): Unit =
  bus.publish(UserAction.ChangeUserName(userID, newName))
// elsewhere
bus.subscribe {
  case UserAction.ChangeUsername(userID, newName) => ...
  case _ => ()
}
enum UserEvent:
  def userID: String

  case UsernameChanged(userID: String, newName: String)
// journal of UserEvents
// instead of List it might be Stream, Iterator or Producer
val userEvents: List[UserEvent]
userEvents.foldLeft(Map.empty[String, User]) {
  (users, event) =>
    event match
      case UserEvent.UsernameChanged(id, name) =>
        val user = users.getOrElse(id, User(id, ""))
        val newUser = user.copy(name = name)
        users.updated(id, newUser)
} // returns Users with all events applied
userEvents.foldLeft(0L) {
  (nameChanges, event) =>
    event match
      case UserEvent.UsernameChanged(_, _) =>
        nameChanges + 1
} // returns how many times Users change names
enum UserEvent:
  def userID: String

  case UsernameChanged(userID: String, newName: String)
// journal of UserEvents
// this has to be streamed from the outer world
val userEvents: List[UserEvent]
userEvents.foldLeft(Map.empty[String, User]) {
  (users, event) =>
    event match
      case UserEvent.UsernameChanged(id, name) =>
        val user = users.getOrElse(id, User(id, ""))
        val newUser = user.copy(name = name)
        users.updated(id, newUser)
} // result should be stored somewhere
def fetchUsers(): Map[String, User] // projection state ATM
val lastOffset: Long // last processed event
def userEvents(offset: Long): Stream[(UserEvent, Long)]
userEvents(lastOffset).foldLeft(
  fetchUsers()
) { (users, eventWithOffset) =>
  eventWithOffset match
    case (UserEvent.UsernameChanged(id, name), offset) =>
      val user = users.getOrElse(id, User(id, ""))
      val newUser = user.copy(name = name)
      val newUsers = users.updated(id, newUser)
      saveUsers(newUsers) // persists projection's state
      saveOffset(offset) // persist projection's offset
      newUsers
}
def fetchUsers(): Map[String, User] // might be large!
val lastOffset: Long // offset absent before the 1st run!
def userEvents(offset: Long): Stream[(UserEvent, Long)]
userEvents(lastOffset).foldLeft(
  fetchUser().getOrElse(User(id = userID, name = ""))
) { (user, eventWithOffset) =>
  eventWithOffset match
    case (UserEvent.UsernameChanged(id, name), offset) =>
      val user = users.getOrElse(id, User(id, ""))
      val newUser = user.copy(name = name)
      val newUsers = users.updated(id, newUser)
      saveUsers(newUsers) // persists projection's state
      saveOffset(offset) // persist projection's offset
      newUsers // we don't need to return a value here!
} // and neither here!
// persisted for each projection
def fetchLastOffset:() Option[String]
// returns User's projection's state by their ID
def fetchUser(userID: String): Option[User]
// no state passed (we use DB now) and no result returned
// allow us to use .foreach instead of .foldLeft
eventsFrom(fetchLastOffset()).foreach { eventWithOffset =>
  eventWithOffset match
    case (UserEvent.UsernameChanged(id, name), offset) =>
      val user = fetchUser(id) // current projection state
      val newUser = user.copy(name = name)
      saveUser(newUser) // persists projection's state
      saveOffset(offset) // persist projection's offset
} // returns Unit (void)

Pitfalls of processing events

Commiting offset upfront

//
eventsFrom(fetchLastOffset()).foreach {
  case (UserEvent.UsernameChanged(id, name), offset) =>
    val user = fetchUser(id)
    val newUser = user.copy(name = name)
    saveUser(newUser)
    saveOffset(offset)
}
// what if this... \/
eventsFrom(fetchLastOffset()).foreach {
  case (UserEvent.UsernameChanged(id, name), offset) =>
    val user = fetchUser(id)
    val newUser = user.copy(name = name)
    saveUser(newUser)
    saveOffset(offset) // <- ...and this were done for you?
}
// 1. fetches the last offset of the projection
// 2. starts streaming events since the last offset
// 3. commits offset before returning event
def newEvents(): Stream[UserEvent]
newEvents().foreach {
  // no need to pass offset here!
  case UserEvent.UsernameChanged(id, name)) =>
    val user = fetchUser(id)
    val newUser = user.copy(name = name)
    saveUser(newUser) // nor save it here!
}
// 1. fetches the last offset of the projection
// 2. starts streaming events since the last offset
// 3. commits offset before returning event
def newEvents(): Stream[UserEvent]
newEvents().foreach {
  case UserEvent.UsernameChanged(id, name)) =>
    // let's say process the crashed when we were here
    val user = fetchUser(id)
    val newUser = user.copy(name = name)
    saveUser(newUser)
}
// 1. fetches the last offset of the projection
// 2. starts streaming events since the last offset
// 3. DOES NOT commits offset before returning event
def newEvents(): Stream[(UserEvent, Offset)]
newEvents().map {
  case (UserEvent.UsernameChanged(id, name)), offset) =>
    val user = fetchUser(id)
    val newUser = user.copy(name = name)
    saveUser(newUser)
    offset
}.foreach(offset => saveOffset(offset))

Lack of idempotency

newEvents().map {
  case (UserEvent.UsernameChanged(id, name)), offset) =>
    val user = fetchUser(id)
    val newUser = user.copy(name = name)
    saveUser(newUser)
    sendUserNotification(newUser, "Name updated")
    offset
}.foreach(offset => saveOffset(offset))
newEvents().map {
  case (UserEvent.UsernameChanged(id, name)), offset) =>
    val user = fetchUser(id)
    val newUser = user.copy(name = name)
    saveUser(newUser)
    sendUserNotification(newUser, "Name updated")
    offset // app killed while here
}.foreach(offset => saveOffset(offset))
newEvents().map { // add event ID  \/  should be unique
  case (UserEvent.UsernameChanged(eid, id, name)),offset) =>
    val user = fetchUser(id)
    val newUser = user.copy(name = name)
    saveUser(newUser, eid)
    sendUserNotification(newUser, "Name updated", eid)
    offset // app killed while here
}.foreach(offset => saveOffset(offset))

Shared global state

val events = List(
  ...
  UserEvent.UsernameChanged(..., id="user-1", name="foo"),
  ...
  UserEvent.UsernameChanged(..., id="user-2", name="foo"),
  ...
)

A few months later, let’s introduce the requirement that name should be globally unique.

Projecting events in parallel within the same projection

...
// user quickly changed its name twice
UserEvent.UsernameChanged(..., id="user-1", name="foo"),
UserEvent.UsernameChanged(..., id="user-1", name="bar"),
UserEvent.UsernameChanged(..., id="user-2", name="foo"),
...

Node 1:

...
UserEvent.UsernameChanged(..., id="user-1", name="foo"),
UserEvent.UsernameChanged(..., id="user-2", name="foo"),
...

Node 2:

...
UserEvent.UsernameChanged(..., id="user-1", name="bar"),
...
race 1
// user quickly changed its name twice
UserEvent.UsernameChanged(..., id="user-1", name="foo"),
UserEvent.UsernameChanged(..., id="user-1", name="bar"),
// then another user takes its old name
UserEvent.UsernameChanged(..., id="user-2", name="foo"),

Node 1:

...
UserEvent.UsernameChanged(..., id="user-1", name="foo"),
UserEvent.UsernameChanged(..., id="user-1", name="bar"),
...

Node 2:

...
UserEvent.UsernameChanged(..., id="user-2", name="foo"),
...
race 2

Pitfalls of creating events

Lack of idempotency

// send to bounded context when we want to change something
enum UserCommand:
  case ChangeUsername(id: String, name: String)

// returned when the command failed
enum UserError:
  case UsernameTaken
  case UserNotExist
val handleUserCommand:
    UserCommand => Either[UserCommand, UserError] = {
  case UserCommand.ChangeUsername(id, name) =>
    if userExists(id) then Right(
      UserEvent.UsernameChanged(randomEventId(), id, name)
    )
    else Left(UserError.UserNotExist)
}
// Right values (events) are saved to events' journal
// Left values (errors) can be returned to caller
UserCommand.ChangeUsername("1", "foo")//client did 3 retries

Relying on the state from the projection

val handleUserCommand:
    UserCommand => Either[UserCommand, UserError] = {
  case UserCommand.ChangeUsername(cmdId, id, name) =>
    if !userExists(id) then Left(UserError.UserNotExist)
    //           \/ asks the DB updated by this projection
    else if !usernameTaken(id, name) then Left(
      UserError.UsernameTaken
    )
    else Right(
      UserEvent.UsernameChanged(eventId(cmdId), id, name)
    )
}
// 2 users try to reserve the same username at the same time
UserCommand.ChangeUsername(id="1", name="foo")
UserCommand.ChangeUsername(id="2", name="foo")

Lack of atomicity

val handleUserCommand:
    UserCommand => Either[UserCommand, UserError] = {
  case UserCommand.ChangeUsername(cmdId, id, name) =>
    if !userExists(id) then Left(UserError.UserNotExist)
    else if !reserveUsernameForUser(id, name) then Left(
      UserError.UsernameTaken
    )
    // here the server crashes :)
    else Right(
      UserEvent.UsernameChanged(eventId(cmdId), id, name)
    )
}

Other difficulties that might (will) happen

  • your events will evolve over time and their format will change

  • you will talk to external APIs that you cannot control, which will make idempotency difficult or impossible

  • you will have bugs resulting in events that shouldn’t have been

  • General Data Protection Regulation (GDPR)

  • and many more

Summary

Ask yourself what will happen when:

  • your process crashes/gets killed

  • someone sends the same event/command twice

  • you need to rerun the projection from some point in the past (maybe even from the start)

  • the format of your events needs to change

Would you arrive at the same state in the end?

Also ask yourself:

  • why you used Event Sourcing in the first place

  • is the way you use it helping to achieve your goal

  • what is the worst outcome of doing ES "wrong"

Questions?

Thank you!