case class User(id: String, name: String)
Mateusz Kubuszok
breaking things in Scala for 7+ years
breaking things for money for 10 years
breaking things for fun for 18(?) years
a little bit of open source
blog at Kubuszok.com
niche Things you need to know about JVM (that matter in Scala) ebook
What is Event Sourcing?
Pitfalls of processing events
Pitfalls of creating events
Other pitfalls
Summary
case class User(id: String, name: String)
// called directly by UI
def changeUsername(userID: String, newName: String): Unit =
val user = getUser(userID)
val updatedUser = user.copy(name = newName)
saveUser(user)
// fetch from DB
def getUser(userID: String): User
// saves to DB
def saveUser(user: User): Unit
trait UserActionListener:
def changeUsername(userID: String, newName: String)
// added to UI
val userActionListeners: List[UserActionListener]
// still in UI
def changeUsername(userID: String, newName: String): Unit =
userActionListeners.foreach { listener =>
listener.changeUsername(userID, newName)
}
enum UserAction:
case ChangeUsername(userID: String, newName: String)
trait UserActionEventBus:
def publish(action: UserAction): Unit
def subscribe(f: UserAction => Unit): Unit
// in UI
def changeUsername(userID: String, newName: String): Unit =
bus.publish(UserAction.ChangeUserName(userID, newName))
// elsewhere
bus.subscribe {
case UserAction.ChangeUsername(userID, newName) => ...
case _ => ()
}
enum UserEvent:
def userID: String
case UsernameChanged(userID: String, newName: String)
// journal of UserEvents
// instead of List it might be Stream, Iterator or Producer
val userEvents: List[UserEvent]
userEvents.foldLeft(Map.empty[String, User]) {
(users, event) =>
event match
case UserEvent.UsernameChanged(id, name) =>
val user = users.getOrElse(id, User(id, ""))
val newUser = user.copy(name = name)
users.updated(id, newUser)
} // returns Users with all events applied
userEvents.foldLeft(0L) {
(nameChanges, event) =>
event match
case UserEvent.UsernameChanged(_, _) =>
nameChanges + 1
} // returns how many times Users change names
enum UserEvent:
def userID: String
case UsernameChanged(userID: String, newName: String)
// journal of UserEvents
// this has to be streamed from the outer world
val userEvents: List[UserEvent]
userEvents.foldLeft(Map.empty[String, User]) {
(users, event) =>
event match
case UserEvent.UsernameChanged(id, name) =>
val user = users.getOrElse(id, User(id, ""))
val newUser = user.copy(name = name)
users.updated(id, newUser)
} // result should be stored somewhere
def fetchUsers(): Map[String, User] // projection state ATM
val lastOffset: Long // last processed event
def userEvents(offset: Long): Stream[(UserEvent, Long)]
userEvents(lastOffset).foldLeft(
fetchUsers()
) { (users, eventWithOffset) =>
eventWithOffset match
case (UserEvent.UsernameChanged(id, name), offset) =>
val user = users.getOrElse(id, User(id, ""))
val newUser = user.copy(name = name)
val newUsers = users.updated(id, newUser)
saveUsers(newUsers) // persists projection's state
saveOffset(offset) // persist projection's offset
newUsers
}
def fetchUsers(): Map[String, User] // might be large!
val lastOffset: Long // offset absent before the 1st run!
def userEvents(offset: Long): Stream[(UserEvent, Long)]
userEvents(lastOffset).foldLeft(
fetchUser().getOrElse(User(id = userID, name = ""))
) { (user, eventWithOffset) =>
eventWithOffset match
case (UserEvent.UsernameChanged(id, name), offset) =>
val user = users.getOrElse(id, User(id, ""))
val newUser = user.copy(name = name)
val newUsers = users.updated(id, newUser)
saveUsers(newUsers) // persists projection's state
saveOffset(offset) // persist projection's offset
newUsers // we don't need to return a value here!
} // and neither here!
// persisted for each projection
def fetchLastOffset:() Option[String]
// returns User's projection's state by their ID
def fetchUser(userID: String): Option[User]
// no state passed (we use DB now) and no result returned
// allow us to use .foreach instead of .foldLeft
eventsFrom(fetchLastOffset()).foreach { eventWithOffset =>
eventWithOffset match
case (UserEvent.UsernameChanged(id, name), offset) =>
val user = fetchUser(id) // current projection state
val newUser = user.copy(name = name)
saveUser(newUser) // persists projection's state
saveOffset(offset) // persist projection's offset
} // returns Unit (void)
//
eventsFrom(fetchLastOffset()).foreach {
case (UserEvent.UsernameChanged(id, name), offset) =>
val user = fetchUser(id)
val newUser = user.copy(name = name)
saveUser(newUser)
saveOffset(offset)
}
// what if this... \/
eventsFrom(fetchLastOffset()).foreach {
case (UserEvent.UsernameChanged(id, name), offset) =>
val user = fetchUser(id)
val newUser = user.copy(name = name)
saveUser(newUser)
saveOffset(offset) // <- ...and this were done for you?
}
// 1. fetches the last offset of the projection
// 2. starts streaming events since the last offset
// 3. commits offset before returning event
def newEvents(): Stream[UserEvent]
newEvents().foreach {
// no need to pass offset here!
case UserEvent.UsernameChanged(id, name)) =>
val user = fetchUser(id)
val newUser = user.copy(name = name)
saveUser(newUser) // nor save it here!
}
// 1. fetches the last offset of the projection
// 2. starts streaming events since the last offset
// 3. commits offset before returning event
def newEvents(): Stream[UserEvent]
newEvents().foreach {
case UserEvent.UsernameChanged(id, name)) =>
// let's say process the crashed when we were here
val user = fetchUser(id)
val newUser = user.copy(name = name)
saveUser(newUser)
}
// 1. fetches the last offset of the projection
// 2. starts streaming events since the last offset
// 3. DOES NOT commits offset before returning event
def newEvents(): Stream[(UserEvent, Offset)]
newEvents().map {
case (UserEvent.UsernameChanged(id, name)), offset) =>
val user = fetchUser(id)
val newUser = user.copy(name = name)
saveUser(newUser)
offset
}.foreach(offset => saveOffset(offset))
newEvents().map {
case (UserEvent.UsernameChanged(id, name)), offset) =>
val user = fetchUser(id)
val newUser = user.copy(name = name)
saveUser(newUser)
sendUserNotification(newUser, "Name updated")
offset
}.foreach(offset => saveOffset(offset))
newEvents().map {
case (UserEvent.UsernameChanged(id, name)), offset) =>
val user = fetchUser(id)
val newUser = user.copy(name = name)
saveUser(newUser)
sendUserNotification(newUser, "Name updated")
offset // app killed while here
}.foreach(offset => saveOffset(offset))
newEvents().map { // add event ID \/ should be unique
case (UserEvent.UsernameChanged(eid, id, name)),offset) =>
val user = fetchUser(id)
val newUser = user.copy(name = name)
saveUser(newUser, eid)
sendUserNotification(newUser, "Name updated", eid)
offset // app killed while here
}.foreach(offset => saveOffset(offset))
val events = List(
...
UserEvent.UsernameChanged(..., id="user-1", name="foo"),
...
UserEvent.UsernameChanged(..., id="user-2", name="foo"),
...
)
A few months later, let’s introduce the requirement that name
should be globally unique.
...
// user quickly changed its name twice
UserEvent.UsernameChanged(..., id="user-1", name="foo"),
UserEvent.UsernameChanged(..., id="user-1", name="bar"),
UserEvent.UsernameChanged(..., id="user-2", name="foo"),
...
Node 1:
...
UserEvent.UsernameChanged(..., id="user-1", name="foo"),
UserEvent.UsernameChanged(..., id="user-2", name="foo"),
...
Node 2:
...
UserEvent.UsernameChanged(..., id="user-1", name="bar"),
...
// user quickly changed its name twice
UserEvent.UsernameChanged(..., id="user-1", name="foo"),
UserEvent.UsernameChanged(..., id="user-1", name="bar"),
// then another user takes its old name
UserEvent.UsernameChanged(..., id="user-2", name="foo"),
Node 1:
...
UserEvent.UsernameChanged(..., id="user-1", name="foo"),
UserEvent.UsernameChanged(..., id="user-1", name="bar"),
...
Node 2:
...
UserEvent.UsernameChanged(..., id="user-2", name="foo"),
...
// send to bounded context when we want to change something
enum UserCommand:
case ChangeUsername(id: String, name: String)
// returned when the command failed
enum UserError:
case UsernameTaken
case UserNotExist
val handleUserCommand:
UserCommand => Either[UserCommand, UserError] = {
case UserCommand.ChangeUsername(id, name) =>
if userExists(id) then Right(
UserEvent.UsernameChanged(randomEventId(), id, name)
)
else Left(UserError.UserNotExist)
}
// Right values (events) are saved to events' journal
// Left values (errors) can be returned to caller
UserCommand.ChangeUsername("1", "foo")//client did 3 retries
val handleUserCommand:
UserCommand => Either[UserCommand, UserError] = {
case UserCommand.ChangeUsername(cmdId, id, name) =>
if !userExists(id) then Left(UserError.UserNotExist)
// \/ asks the DB updated by this projection
else if !usernameTaken(id, name) then Left(
UserError.UsernameTaken
)
else Right(
UserEvent.UsernameChanged(eventId(cmdId), id, name)
)
}
// 2 users try to reserve the same username at the same time
UserCommand.ChangeUsername(id="1", name="foo")
UserCommand.ChangeUsername(id="2", name="foo")
val handleUserCommand:
UserCommand => Either[UserCommand, UserError] = {
case UserCommand.ChangeUsername(cmdId, id, name) =>
if !userExists(id) then Left(UserError.UserNotExist)
else if !reserveUsernameForUser(id, name) then Left(
UserError.UsernameTaken
)
// here the server crashes :)
else Right(
UserEvent.UsernameChanged(eventId(cmdId), id, name)
)
}
your events will evolve over time and their format will change
you will talk to external APIs that you cannot control, which will make idempotency difficult or impossible
you will have bugs resulting in events that shouldn’t have been
General Data Protection Regulation (GDPR)
and many more
Ask yourself what will happen when:
your process crashes/gets killed
someone sends the same event/command twice
you need to rerun the projection from some point in the past (maybe even from the start)
the format of your events needs to change
Would you arrive at the same state in the end?
Also ask yourself:
why you used Event Sourcing in the first place
is the way you use it helping to achieve your goal
what is the worst outcome of doing ES "wrong"