Возможно Вы что-то слышали о том как устроена параллелизация программ в Scala, быть может именно поэтому Вы и заинтересовались Scala. Средства, предоставляемые Scala, очень сильно упрощают задачу построения параллельных приложений, в сравнении с теми низкоуровневыми интерфейсами, что доступны в большинстве других языков.
Параллельные вычисления в Scala основаны на двух столбах, один из них — Future
,
другой — Actor
. О первом мы и поговорим в этой статье. Я объясню чем хорош
этот тип и как с ним работать в функциональном стиле.
Убедитесь в том, что у вас установлен компилятор Scala версии не ниже 2.9.3, иначе примеры не будут работать. Этот тип появился в версии 2.10 и был портирован для 2.9.3, изначально в немного другом виде он входил в состав библиотеки для параллельных вычислений Akka.
Предположим, нам нужно приготовить капучино. Для этого просто выполним последовательность действий одно за другим:
- Помолем кофейные зёрна
- Вскипятим воду
- Сварим эспрессо, смешав молотые зёрна с кипятком
- Взобьём молоко
- Добавим молоко в эспрессо и капучино готов
Если перевести в Scala, мы получим следующее:
import scala.util.Try
// Определим осмысленные синонимы:
type CoffeeBeans = String
type GroundCoffee = String
case class Water(temperature: Int)
type Milk = String
type FrothedMilk = String
type Espresso = String
type Cappuccino = String
// Методы-заглушки для отдельных шагов алгоритма:
def grind(beans: CoffeeBeans): GroundCoffee = s"ground coffee of $beans"
def heatWater(water: Water): Water = water.copy(temperature = 85)
def frothMilk(milk: Milk): FrothedMilk = s"frothed $milk"
def brew(coffee: GroundCoffee, heatedWater: Water): Espresso = "espresso"
def combine(espresso: Espresso, frothedMilk: FrothedMilk): Cappuccino = "cappuccino"
// Исключения, на случай если что-то пойдёт не так
// (они понадобяться нам позже):
case class GrindingException(msg: String) extends Exception(msg)
case class FrothingException(msg: String) extends Exception(msg)
case class WaterBoilingException(msg: String) extends Exception(msg)
case class BrewingException(msg: String) extends Exception(msg)
// последовательно выполним алгоритм:
def prepareCappuccino(): Try[Cappuccino] = for {
ground <- Try(grind("arabica beans"))
water <- Try(heatWater(Water(25)))
espresso <- Try(brew(ground, water))
foam <- Try(frothMilk("milk"))
} yield combine(espresso, foam)
У такого подхода есть несколько преимуществ: у нас есть очень простой пошаговый рецепт, мы знаем что нам делать и мы не собьёмся, поскольку у нас нет необходимости в переключении контекста.
Но есть и свои минусы. Нам часто приходиться ждать впустую. Мы не можем ничего делать пока мы ждём приготовления зёрен. Ведь мы собираемся перейти к следующему шагу только тогда, когда закончиться предыдущий.
Очевидно, что мы теряем много времени. Скорее всего нам захочется начать сразу несколько действий из этого списка одновременно. Как только зёрна и вода будут готовы, мы начнём делать эспрессо, по ходу взбивая молоко.
Точно так же дела обстоят и в программировании. Множество потоков ждут ответа от веб-сервера. Мы не хотим, чтобы все они были заблокированы выполнением какого-нибудь запроса к базе или вызовом HTTP-сервиса. В этом случае мы можем воспользоваться асинхронными вычислениями и неблокирующим IO, так чтобы при вызове одним из запросов базы данных, веб-сервер мог заниматься другими запросами, а не простаивать в пустую в ожидании ответа от базы.
“Я слышал, Вам нравятся функции обратного вызова, и я поместил одну функцию обратного вызова в другоую!”
Конечно, Вы наслышаны обо всём этом через шумиху вокруг Node.js. Подход Node.js заключается в использовании функций обратного вызова (callback). К сожалению, при таком подходе код легко превращается в кашу из функций обратного вызова, в других функциях, которые сами в других функциях, которые .... в других функциях обратного вызова. Такой код крайне трудно читать и отлаживать.
В Scala мы тоже можем пользоваться функциями обратного вызова через Future
, но нам они не понадобятся.
Есть лучшие альтернативы.
“Я знаю о типе
Future
. Он совершенно бесполезен!”
Вам, возможно, встречалась реализация Future
в Java. Всё что мы можем делать с Future
в Java
так это проверить завершилось оно или нет, или блокировать вычисления до завершения. Этот тип
практически бесполезен и работать с ним не так сладко.
Но Вы будете приятно удивлены тем, как устроен тип Future
в Scala. Так приступим!
Тип Future[T]
, определённый в scala.concurrent package — это тип коллекция, представляющий вычисление, которое
когда-нибудь закончится и вернёт значение типа T
. Вычисление может закончиться с ошибкой или не буть вычисленным
в поставленные временные рамки. Если что-то пойдёт не так, то результат будет содержать исключение.
Future
это контейнер для однократной записи, когда вычисление будет завершено, значение этого типа неизменяемое.
Также Future
предоставляет методы, позволяющие считать вычисляемое значение. Запись значения осуществляется
с помощью типа Promise
. Эти понятия чётко разделены в интерфейсе. Данная статья посвящена Future
, а о Promise
мы поговорим в следующей.
Существует несколько способов использования Future, мы посмотрим на них на нашем кофейном примере.
Во первых, нам нужно переписать все функции для отдельных шагов так, чтобы они
сразу возвращали результат в виде Future
, а не блокировали бы вычисления.
import scala.concurrent.future
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.Random
def grind(beans: CoffeeBeans): Future[GroundCoffee] = Future {
println("start grinding...")
Thread.sleep(Random.nextInt(2000))
if (beans == "baked beans") throw GrindingException("are you joking?")
println("finished grinding...")
s"ground coffee of $beans"
}
def heatWater(water: Water): Future[Water] = Future {
println("heating the water now")
Thread.sleep(Random.nextInt(2000))
println("hot, it's hot!")
water.copy(temperature = 85)
}
def frothMilk(milk: Milk): Future[FrothedMilk] = Future {
println("milk frothing system engaged!")
Thread.sleep(Random.nextInt(2000))
println("shutting down milk frothing system")
s"frothed $milk"
}
def brew(coffee: GroundCoffee, heatedWater: Water): Future[Espresso] = Future {
println("happy brewing :)")
Thread.sleep(Random.nextInt(2000))
println("it's brewed!")
"espresso"
}
Что же происходит в примере? Поясним моменты связанные с Future
.
Во первых, в объекте-компаньоне для Future
определён метод apply
,
он принимает два аргумента:
object Future {
def apply[T](body: => T)(implicit execctx: ExecutionContext): Future[T]
}
В параметр body
по имени передаётся вычисление, которое будет выполняться асинхронно.
Второй параметр — контекст вычисления, заключённый в отдельный список параметров, является имплицитным (implicit),
это означает, что мы можем не передавать его явно, если значение с таким типом определено
в той же области видимости переменных. В случае Future
для этого мы импортируем глобальный
контекст вычисления.
Значение типа ExecutionContext
это то, что нужно для выполнения асинхронных вычислений, мы
можем представить что это пул потоков. Поскольку этот параметр передаётся неявно, в нашем методе apply
остаётся лишь один аргумент. Для передачи одного параметра в Scala мы можем воспользоваться как круглыми
так и фигурными скобками. Очень часто для вызова Future
используются именно фигурные скобки,
словно это не обычный вызов, а применение некоторой встроенной функции языка. Неявная передача
ExecutionContext
происходит почти во всех методах интерфейса для Future
.
В нашем кофейном примере нам не приходится ничего вычислять, поэтому, мы просто
вставили задержку потока вычисления с некоторым произвольным временем.
Также мы выводим сообщения на печать до и после задержки, для того чтобы увидеть
неопределённость, присущую параллельным вычислениям в нашем примере.
Вычисление возвращаемого значения в Future
начнётся в некотором потоке из
ExecutionContext
в неопределённое время после создания значения типа Future
.
Иногда, функции обратного вызова очень хорошо подходят для простых случаев. Они могут
вызываться на Future
в виде частично определённых функций. Мы можем передать функцию
обратного вызова методу onSuccess
и она будет вызвана, только когда вычисление закончиться
успешно. Функция принимает значение, которое должно когда-нибудь вернуть Future
:
receives the computed value as its input:
grind("arabica beans").onSuccess { case ground =>
println("okay, got my ground coffee")
}
Также с помощью метода onFailure
мы можем вызвать какую-нибудь функцию, если случилось исключение.
Функция обратного вызова принимает значение типа Throwable
, но она будет вызвана
только в том случае, если Future
не удастся вычислить значение.
Обычно, лучше всего одновременно объявить действия для того и другого случая.
Тогда общая функция будет принимать значение типа Try
:
import scala.util.{Success, Failure}
grind("baked beans").onComplete {
case Success(ground) => println(s"got my $ground")
case Failure(ex) => println("This grinder needs a replacement, seriously!")
}
Поскольку мы передаём бобы вместо зёрен, метод grind
закончиться с ошибкой, что приведёт к
Failure
в Future
.
Функции обратного вызова могут быть очень неуклюжими, если нам приходиться
писать вложенные функции обратного вызова. К счастью, нам не нужно этого делать!
Настоящая сила Future
в Scala в том, что их можно комбинировать.
Вы скорее всего заметили, что все коллекции, что встретились нам предоставляют
интерфейс из методов map
, flatMap
и др. Мы можем строить их с помощью for
-генераторов.
Но Future
также является типом-коллекцией. Поэтому в Scala мы можем пользоваться всеми
этими методами и для Future
.
Но вот вопрос: что означают все эти операции для значения, которое даже ещё не было вычислено?
Хотели бы вы путешествовать во времени? Узнать о том, что даже ещё не случилось?
Scala даёт вам такую возможность! Предположим, что когда закипит вода, мы хотим
проверить температуру. Мы можем сделать это преобразовав Future[Water]
в Future[Boolean]
:
val temperatureOkay: Future[Boolean] = heatWater(Water(25)).map { water =>
println("Мы в будущем!")
(80 to 85).contains(water.temperature)
}
Результат этого выражения Future[boolean]
, когда-нибудь в будущем вернёт вычисленное
логическое значение. Измените определение heatWater
так, чтобы оно возвращало
исключение (например кастрюля с водой могла взорваться) и вы убедитесь в том, что
сообщение "Мы в будущем!"
не будет напечатано.
Внутри функции, которую мы передаём в map
, будущее уже наступило или точнее предполагаемое будущее.
Эта функция выполняется как только значение Future[Water]
будет успешно вычислено. Однако время может
пойти и по-другому пути. Если Future[Water]
завершиться с ошибкой, будущее для этой функции никогда
не наступит. Вместо этого результатом вызова будет значение типа Future[Boolean]
, содержащее
Failure
.
Если одно асинхронное вычисление зависит от другого асинхронного вычисления, то нам понадобится
метод flatMap
. Так мы не заблудимся в дебрях вложенных Future
.
Предположим, что процесс для измерения температуры необходимо некоторое время,
поэтому проверка температуры должна происходить асинхронно. У нас есть функция,
что принимает воду (Water
) и возвращает Future[Boolean]
:
def temperatureOkay(water: Water): Future[Boolean] = Future {
(80 to 85).contains(water.temperature)
}
Теперь вместо map
для проверки воды после кипения мы воспользуемся методом flatMap
,
так мы получим Future[Boolean]
, а не Future[Future[Boolean]]
:
val nestedFuture: Future[Future[Boolean]] = heatWater(Water(25)).map {
water => temperatureOkay(water)
}
val flatFuture: Future[Boolean] = heatWater(Water(25)).flatMap {
water => temperatureOkay(water)
}
Функция, переданная в flatMap
, будет вызвана только в том случае если значение в Future[Water]
будет
вычислено успешно.
Зачастую вызов flatMap
можно заменить более простым выражением с for
-генераторами.
Код станет гораздо более наглядным. Перепишем наш пример:
val acceptable: Future[Boolean] = for {
heatedWater <- heatWater(Water(25))
okay <- temperatureOkay(heatedWater)
} yield okay
Если у нас есть несколько параллельных вычислений, необходимо следить за тем чтобы
соответствующие им Future
создавались за пределами for
-генратора.
Так следующее выражение будет выполняться последовательно:
def prepareCappuccinoSequentially(): Future[Cappuccino] = {
for {
ground <- grind("arabica beans")
water <- heatWater(Water(20))
foam <- frothMilk("milk")
espresso <- brew(ground, water)
} yield combine(espresso, foam)
}
Этот код прекрасно выглядит, но поскольку это всего лишь представление
вложенных вызовов метода flatMap
, это означает, что значение Future[Water]
,
созданное в heatWater
, создаётся только тогда, когда Future[GroundCoffee]
будет успешно вычислено. Мы можем проверить это по выводу сообщений на печать.
Поэтому не забывайте о том, что необходимо создавать независимые Future
за пределами for
-генератора:
def prepareCappuccino(): Future[Cappuccino] = {
val groundCoffee = grind("arabica beans")
val heatedWater = heatWater(Water(20))
val frothedMilk = frothMilk("milk")
for {
ground <- groundCoffee
water <- heatedWater
foam <- frothedMilk
espresso <- brew(ground, water)
} yield combine(espresso, foam)
}
Теперь все три асинхронных вычисления начнутся одновременно.
в этом можно убедиться по выводу сообщений на печать. Видно, что
порядок их появления не определён. Единственное в чём мы можем быть уверены,
так это в том, что сообщение "happy brewing"
будет появится последним.
Поскольку метод, ответственный за его печать, зависит от двух других
асинхронных вычислений. Он может быть вызван только внутри for
-генератора,
когда предыдущие вычисления успешно завершатся.
Вы скорее всего обратили внимание на то, что Future[T]
делает акцент на
успешном вычислении значения, благодаря этому мы можем пользоваться методами
map
, flatMap
, filter
и др, подразумевая успешное завершение вычислений.
Иногда нам хочется воспользоваться функциональным стилем работы с Future
и в другой альтернативе, в которой случаются исключения. Вызовом метода failed
на Future
мы можем получить проекцию исключений Future[Throwable]
и вызвать метод map
,
который будет выполнен только в том случае, если что-то пойдёт не так.
Мы посмотрели на Future
и оно прекрасно! Работать с этим типом — одно удовольствие, ведь
он поддерживает все методы для коллекций. Мы можем комбинировать значения в функциональном стиле.
Мы можем очень легко превратить блокирующий код в асинхронный, завернув его в Future
.
Однако, лучше всего начинать с асинхронных вычислений. Мы можем пообещать (promise),
что когда-нибудь вернём значение. На языке Scala это означает, создать значение типа Promise
,
которое завершится в будущем (создаст значение в типе Future
). Об этом мы и поговорим в следующей статье.