COROUTINES-23: code review fixes
Разработчик проводит на порядок больше времени за чтением кода, чем за его написанием. Поэтому на код ревью мы чаще всего проверяем форматирование, нейминг, наличие тестов, наличие антипаттернов и соблюдение концепции той архитектуры, которая принята на проекте. Но лично я больше ценю комментарии тех людей, которые подсвечивают неочевидные кейсы или возможные баги.
Это очень полезный скилл, если и дальше хочешь развиваться как технический специалист. Чтобы найти ошибки чужом коде, может оказаться недостаточно вникнуть в то, что написано, надо ещё знать нюансы API тех библиотек, которые ты используешь
Привет, меня зовут Дмитрий Елизаров, я Android-разработчик команды Альфа-Инвестиций. Cегодня я как раз хотел бы рассказать о нескольких неочевидных кейсах использования корутин, и то о чём ещё стоит помнить при проверке кода с их использованием.
Emit or not, there is no try
Допустим нам на ревью присылают вот этот кусок кода.
class PersonsViewModel(
private val interactor: PersonsInteractor,
) : ViewModel() {
private val _newPersonAddedFlow = MutableSharedFlow()
val newPersonAddedFlow: SharedFlow = _newPersonAddedFlow
fun addNewPerson(person: Person) {
viewModelScope.launch {
val rowId = withContext(Dispatchers.IO) {
interactor.savePerson(person)
}
_newPersonAddedFlow.tryEmit(rowId)
}
}
fun getPersonsFlow(): Flow = interactor.getPersonsFlow()
}
На первый взгляд обычная вью-модель, с методами сохранения в БД и событием оповещения о том, что в базу был записан новый пользователь. Увидев работу с базой, открыл свой стандартный чек-лист:
диспатчинг на IO — check;
вызываем на viewModelScope — check;
. . .;
/sPROFIT/s Approved ну или NW если TODO (здесь должна быть шутка про нейминг, но я не придумал для неё название) тут уж как повезёт.
Но если обратить внимание, то заметим что tryEmit метод синхронный. Для его работы необходим буфер внутри SharedFlow, размер которого настраивается параметрами replay и extraBufferCapacity, а по дефолту они нули. Так происходит потому, что tryEmit не может «подождать» пока значение появится, о чем писал Роман Елизаров в соответствующем issue.
tryEmit (unlike emit) is not a suspending function, so it clearly cannot operate without a buffer where it can store emitted value for all the suspending subscribers to process. On the other hand, emit is suspending, so it does not need buffer space, as it can always suspend in case any of the subscribers are not ready yet.
class PersonsViewModel(
private val interactor: PersonsInteractor,
) : ViewModel() {
// либо
private val _newPersonAddedFlow = MutableSharedFlow(
replay = 1,
extraBufferCapacity = 1,
)
val newPersonAddedFlow: SharedFlow = _newPersonAddedFlow
fun addNewPerson(person: Person) {
viewModelScope.launch {
val rowId = withContext(Dispatchers.IO) {
interactor.savePerson(person)
}
// либо
_newPersonAddedFlow.emit(rowId)
}
}
fun getPersonsFlow(): Flow = interactor.getPersonsFlow()
}
Events must flow
Смотрим наш пулл реквест дальше, идём в класс фрагмента.
class PersonsFragment: Fragment(R.layout.f_persons) {
private val personsAdapter by lazy {
PersonsAdapter(requireContext())
}
private val viewModel: PersonsViewModel by viewModels {
PersonsViewModel.Factory
}
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
super.onViewCreated(view, savedInstanceState)
view.findViewById(R.id.add_new).setOnClickListener {
val person = createRandomPerson()
viewModel.addNewPerson(person)
}
with(view.findViewById(R.id.persons)) {
adapter = personsAdapter
}
viewLifecycleOwner.lifecycleScope.launch {
viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.newPersonAddedFlow.collect(::showToast)
viewModel.getPersonsFlow().collect(::submitNewPerson)
}
}
}
private fun submitNewPerson(person: Person) {
personsAdapter.add(person)
}
private fun showToast(rowId: Long) {
val text = if (rowId != NO_ID) {
"New person added"
} else {
"No person added"
}
Toast.makeText(requireContext(), text, Toast.LENGTH_LONG).show()
}
}
Видим строчки один в один из Гугловой документации понимаем что уж кто-кто, а Гугл никогда свои доки не выпускает с ошибками и ставим автоматический аппрув.
Ставим же?
Но давайте на секунду задумаемся, когда исполнение дойдёт до строчки?
viewModel.getPersonsFlow().collect(::submitNewPerson)
?
Очевидно после того как выполнится collect.
viewModel.newPersonAddedFlow.collect (:: showToast)
А это произойдёт примерно никогда.
Но если быть точным, то…
когда завершится корутина в которой мы подписались на flow, но поскольку мы на этой же корутине пытаемся подписаться на другой flow, то в конечном счёте подписки не произойдёт.
Какой из этого можно сделать вывод? «Каждая новая подписка = новая запущенная корутина».
class PersonsFragment: Fragment(R.layout.f_persons) {
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
super.onViewCreated(view, savedInstanceState)
view.findViewById(R.id.add_new).setOnClickListener {
val person = createRandomPerson()
viewModel.addNewPerson(person)
}
with(view.findViewById(R.id.persons)) {
adapter = personsAdapter
}
subscribeToNewPersonNotifications()
subscribeToUpdatePersonsList()
}
private fun subscribeToNewPersonNotifications() {
viewLifecycleOwner.lifecycleScope.launch {
viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.newPersonAddedFlow.collect(::showToast)
}
}
}
private fun subscribeToUpdatePersonsList() {
viewLifecycleOwner.lifecycleScope.launch {
viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.getPersonsFlow().collect(::submitNewPerson)
}
}
}
}
You never expect the coroutine context
Всем известно, что секрет надёжности проекта кроется в 2-х простых правилах.
1. Не деплой вечером в пятницу.
2. Вовремя закрывай все сетевые соединения.
Допустим, при старте нашего экрана мы хотим что-либо скачать, (или хотя бы поднять соединение). Можем написать нечто подобное:
@WorkerThread
interface SocketConnection {
fun establish()
fun close()
}
class DataActivity: AppCompatActivity() {
private lateinit var repository: Repository
private lateinit var socketConnection: SocketConnection
override fun onCreate (savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
loadData()
if (!checkKeyArguments()) {
finish()
}
}
private fun loadData() {
lifecycleScope.launch {
withContext (Dispatchers.IO) {
suspendCancellableCoroutine { continuation ->
continuation.invokeOnCancellation(::closeConnectionEmergency)
connection.establish ()
repository.getData(
onSuccess = { data ->
continvation.resume(data, onCancellation = ::closeConnectionEmergency)
connection.close()
},
onError = continuation::cancel
)
}
}
}
}
}
Вроде все правила соблюдены: внутри IO контекста закрываем соединение в случае, если на каком-то этапе произошла ошибка. Но как читатель уже мог догадаться, вряд ли бы он тут видел этот пример, если бы в нём не было подвоха.
Но первое, что нужно понимать при работе с корутинами —
launch { ... }
не синхронный.
Лямбда, передаваемая внутрь, это инструкции, которые будут исполнены в ближайшем будущем, но не сейчас!
Также следует обратить внимание, что присутствует условие, при котором возможно терминальное закрытие экрана ещё до начала его работы. Можно подумать: «Ну и что, закрытие приведёт к отмене корутины, а у нас этот случай предусмотрен». И всё бы хорошо, если бы не одно НО.
There is no guarantee on the execution context in which the 'handler' will be invoked.
Всё дело в том что если корутину успеть отменить ещё до начала её исполнения, то onCancel вызовется на том потоке, на котором был отменён её scope. А значит не будет соблюден контракт @WorkerThread, который указан у SocketConnection и как следствие возможны ошибки в зависимости от реализации.
Be gentle with your transactions
Ну и напоследок моё любимое. Чтобы не ограничивать разработчиков стандартными языковыми конструкциями и позволить самому создавать свой проблемно-ориентированный язык, Kotlin предоставляет огромные возможности по расширению языковых конструкций. Например, передача лямбда-выражения в качестве аргумента функции позволяет выносить тело функции в отдельный блок за вызов функции и написать следующее.
class AppSqlHelper : SQLiteOpenHelper() {
suspend fun startTransaction(block: suspend (Transaction) -> Unit) {
val t = Transaction()
try {
block.invoke(t)
} catch (e: Exception) {
t.rollback()
throw e
}
}
inner class Transaction {
init {
sql.beginTransaction()
}
fun closeSuccessfully() {
sql.commitTransaction()
}
fun rollback() {
sql.rollbackTransaction()
}
fun saveMessage(message: ChatMessage) {
// perform sql query
}
}
}
class ChatRepository {
private val db = Database()
private val dbScope = CoroutineScope(Dispatchers.IO) + SupervisorJob()
fun saveUserChat(userChat: ChatData) {
dbScope.launch {
db.saveChatInfo(userChat.info)
db.startTransaction { transaction ->
for (message in userChat.messages) {
transaction.saveMessage(message)
}
transaction.closeSuccessfully)
}
}
}
}
В таком виде читать код гораздо приятнее, всю рутинную работу по отмене транзакций скрыли, видна только сама суть. Right? Wrong!
Во-первых, транзакция перестала быть синхронной, поскольку внутри себя теперь содержит suspend функцию.
Во-вторых, это означает, что на текущем потоке могут быть вызваны другие suspend-методы, которые в этот момент тоже что-то пишут в базу данных, а значит попадут в текущую транзакцию. И в случае возникновения исключения, могу лишь пожелать удачи тому, кто будет думать о том, что на самом деле произошло и что стало с целостностью данных.
По закону Мёрфи, если существует вероятность неблагоприятного события, то оно обязательно настанет, а значит страхуйте своих тиммейтов, вникайте в суть написанного кода.
Рекомендованные статьи:
Также подписывайтесь на Телеграм-канал Alfa Digital — там мы постим новости, опросы, видео с митапов, краткие выжимки из статей, иногда шутим.