HWBP #2: Reactive chat list

Implementing a reactive screen using Kotlin, Flow, and reactive Room queries.

This is part of the How We Built Primal series, an exploration of how we built a modern Android app. Primal is a mobile app that allows close friends to send video messages back-and-forth.

In the previous article, we discussed the offline-first architecture in use in Primal on a conceptual level. In this article, we're going to look at implementing a specific screen, by leveraging Kotlin, Flow operations, and reactive Room queries.

The Chat List Screen

Lists are some of the most common UI components within apps. Often, these lists need to stay up-to-date with data that's changing across our app. Let's look at one such list within the Primal app: the Chat List.

Here are some requirements of our Chat list:

  • It displays a list of chats the logged-in user is a member of;
  • Each row displays the following data about the chat: its name, list of members (as avatars), whether it has unread messages, and the date of the last message within it.
  • The list is ordered in reverse chronological order of the last message sent within each chat;
  • The list should update as the underlying data changes.

This kind of Chat List is a pretty standard pattern across messaging apps. You can find a similar list in apps such as Whatsapp or Telegram, perhaps with slightly different aesthetics.

Ok. Let's see how to implement it.

Entities design

Here's a brief view of the data entities that are relevant to the Chat List functionality. The scope of these models is app-wide: i.e. they are used across the whole codebase.

// A Workspace represents a group or chatroom concept
data class Workspace(
    val id: String,
    val name: String,
	//...
)

// Represents the membership of a User in a Workspace. 
data class UserWorkspace(
	val id: String,
    val user: User,
    val workspace: Workspace,
    val dateJoined: Date
)

//An individual message sent inside a chat.
data class Message(
	val id: String,
	val datePublished: Date,
	val read:Boolean
	//...
)

Local database

In the previous article in this series, we explained that we use a local-storage-first app architecture. Instead of querying data from network servers directly, our screens consume data exclusively from the local storage (database and key-value). Let's look at the relevant queries:

interface LocalDatabaseService{
	//list of chats a user is part of
	fun observeUserWorkspaces(user: User): Flow<List<UserWorkspace>>
	//list of members of a chat
	fun observeWorkspaceUsers(workspaceId: String): Flow<List<UserWorkspace>>
	fun observeLatestMessageDate(workspaceId: String): Flow<Date?>
	fun observeUnreadMessageCount(workspaceId: String): Flow<Int>
  //...other insert, delete, fetch and observe methods
}

Instead of a typical fetchXYZ() that returns XYZ once, an observeXYZ() function returns a Flow in which each emitted value is of type XYZ.

The actual implementation of these methods depends on the database library you're using. In Room, we simply need to change the return type of the query from a List to a Flow. This change instructs Room to observe the query rather than executing it once, emitting new values as the query result changes:

@Dao
abstract class WorkspaceDao{
	@Query("SELECT...")
	abstract fun observeUserWorkspaces(userId: String)
    	:Flow<List<UserWorkspaceJoin>>
	//...other similar functions
}

Room always emits the query result at least once.

The ViewModel

Let's look at how we use the above methods to actually implement the desired feature. The code within this section is placed inside a ViewModel, a place where we define the business logic for the screen. First, we define a new Chat entity which encapsulates data that represents a chat per the requirements of this particular screen:

data class Chat(
    val workspaceId: String,
	val workspaceName: String,
    val lastUpdated: Date,
    val hasUnreadMessages: Boolean? = null,
    val members: List<User>? = null
)

By the end of this section, we should have a List<Chat> that we can pass on for rendering on-screen.

We begin by observing the list of chats a user is part of:

//localDb is an implementation of the database interface presented above

fun observeUserChatList(activeUser: User): Flow<List<Chat>> {
	return localDb.observeUserWorkspaces(activeUser)
	  .map { userWorkspaceList ->
	     userWorkspaceList.map {// it:UserWorkspace
		     Chat(it.workspace.id, it.workspace.name, it.dateJoined!!)
			 //we set it.dateJoined as the default chat.lastUpdated
       }
    }
} 

Since our database methods are used across the whole codebase, they only return values using app-wide data entities. Therefore, we need to transform the Flow<List<UserWorkspace>> we get from the database service to a Flow<List<Chat>> that is useful to us within this screen.

Like List,  Flow offers the map() function that allows you to transform each emitted value from its original type to one of your choosing. So we use map() to make this conversion, creating a new Chat object for each UserWorkspace object we get from the query.

Flows of flows: combining asynchronous streams of data

However, at this point, each Chat in the emitted list is incomplete. The lastUpdated, hasUnreadMessages, and members fields still contain their default values. This is because the first query alone does not provide us with this data.

To fill in the gaps, we need to further process each emitted value. Let's start by populating the lastUpdated field:

fun Flow<List<Chat>>.transformAddLastMessageDates(): Flow<List<Chat>> {
		return this.flatMapLatest { chats ->    
			  val flowsWithLastMessageDates = chats.map { chat ->   
				    localDb.observeLatestMessageDate(chat.workspaceId) 
						    .map { date ->
						        if (date != null) {
							          chat.copy(lastUpdated = date)
					            } else {
								      chat
						        }
					         }
			  }

		      //Combine such that the dates for the whole list are available at once
		      combine(flowsWithLastMessageDates) { it.asList() } 
    }
}

The most important concept introduced here is the ability to combine multiple Flows into a single Flow.

Think of a car factory. You might have 2 assembly lines, one that builds the car internals (engine, etc.), the other that builds the car externals (doors, etc.). At some point, the outputs of both lines are combined into a single object (e.g. the car), which then continues to the next steps (e.g. painting, testing, etc.).

Similarly, we can define a Flow that emits values as the result of the emission of 2 or more Flows. This is done using operators such as flatMap andcombine, each with behavior suitable for different situations.

In the above code snippet, we define a function that takes in a Flow<List<Chat>> and returns another flow of the same type, but with the lastUpdated field filled in. lastUpdated is actually a value that does not exist within the Workspacetable in the database. Rather, it is computed based on the most recent message within another table: Message. Moreover, this value can change over time: e.g. as users send new messages.

To satisfy these requirements, we use the flatMapLatest operator to combine the incoming flow with another flow that observes the latestMessageDate query. The list of chats emitted from the first flow is used as an input to the second flow. We use flatMapLatest particularly because we only care about the latest chat list. When the first flow emits a new chat list, the previous "latestMessageDate query Flow" is canceled, and a new one is created, with the latest chat list as an input.

There is one detail though. We don't actually have a database function that takes in a whole list of chats and returns the "most recent message date" for all of them. Instead, we have observeLatestMessageDate() that returns this value for a single chat.

To solve this, we use the combine operator to combine the individual chat flows into a single flow for the whole list. combine takes in a list of flows and returns a single flow that emits its first value only after all of its flows emit their first value. This ensures that we have dates for every chat before proceeding.

Putting it all together

The above function is for the lastUpdated property. Similar transformXYZfunctions are defined for the other properties we need to observe: hasUnreadMessages, and members. Each function takes in a flow and returns a new flow that performs operations based on the emissions of the input flow. When put together, this is what it looks like:

val vmScope = CoroutineScope(Dispatchers.IO) //background threads

fun observeChats(activeUser: User) {//called when the screen is created
        vmScope.launch {
            val chatListFlow = observeUserChatList(activeUser)
                .transformAddLastMessageDates()
                .transformSortByLastUpdateDate()
                .transformAddReadStatus()
                .transformAddMembers()

            chatListFlow.collect { chats ->
                //deliver the list for rendering, perhaps in a RecyclerView
            }
        }
    }

Some details to note:

  • Because we defined each transformXYZ() as an extension function, we can compose their calls in a nicer way than if we had used parameters. Neat!;
  • All those operations take place within a background thread, so the UI thread is never blocked;
  • The lambda passed to chatListFlow.collect() is where you want to make use of the chat list. In our app, we deliver it to a RecyclerView for rendering.

Thank you for reading.

Let's be friends on Twitter