⚡️ Concurrency Patterns. Fan In
В предыдущем посте мы немного напутали с определением такого многопоточного паттерна как Fan Out
На деле мы показывали Tee, который распространяет одно и то же значение V из канала-источника на N каналов-потребителей
Отличие Fan Out от Tee в том, что на N каналов распространяются разные значения из одного канала-источника. Тобишь, воркеры тянут значения из одного канала, борясь за них насмерть
Лирическое отступление закончено, наша совесть чиста, а сегодняшняя тема будет посвящена Fan In
Этот паттерн является обратным для Fan-Out. Мы собираем данные из нескольких каналов-источников и направляем их в один общий канал-потребитель
1. Default
Итак, что мы имеем?
— Есть воркеры — они кладут значения в N каналов и являются продьюсерами
— Каждый из этих N каналов будет получать значения от своего продьюсера. Назовем такие каналы "стоковыми"
— Есть один общий канал out, туда будет нужно отправить все значения из стоковых каналов
— Для этого мы запускаем N потоков, каждый из которых слушает свой стоковый канал, куда кладет значения продьюсер и редиректит все значения в out
Playground пример
Но что же будет, если какой-то наш продьюсер потух и больше не шлет никаких значений, а передаггый контекст не отменен? Как бы нам понять, что воркер не является активным и перезапустить его? — в этом нам поможет такой механизм как "Heartbeats"
Heartbeat — это регулярное сообщение от продьюсера/воркера, подтверждающее, что он жив и работает
2. Heartbeats
Приступим к рассмотрению этого чуда!
Основная идея проста:
— Имеем структуру, которая хранит в себе стоковый канал, используемый как пайп между воркером и стоком, и канал "сердцебиений"
— Функция Supervise ответственна за отслеживание "сердцебиений" и перезапуск воркера при их отсутствии по TTL
— Функция FanIn принимает на вход стоковые каналы и возвращает результирующий канал, из которого можно читать данные
Всмотримся в наши функции поподробнее
2.1. FanIn
— Не отклоняемся от цели: выкачиваем данные из стоковых каналов и перекладываем в out, реагируя на контекст и неблокирующе отправляя "сердцебиение" нашему супервизору, который пристально наблюдает за нашими воркерами
— WaitGroup здесь так же используется для того, чтобы дождаться конца работы наших стоков и отдать управление основному потоку
после дренажа всех "живых" значений
2.2 Supervise
— Создаем стоковый канал и канал "сердцебиений", агрегируем эти значения в структуре Source и возвращаем ее
— В отдельном потоке запускаем нашу рутину по отслеживанию и перезапуску воркеров
2.2.1 Смотрим на внутренности запущенного потока внутри Supervise
— Изначально происходит создание дочернего контекста с отменой для нашего воркера. Этот контекст будет рулить в тот момент, когда наш TTL пройдет и надо будет потушить воркера
— Создаем ticker, который будет слать ивенты, семантически значащие следующее: "в нашего воркера стреляли и он упал в лужу на..."
После получения ивента мы отменяем контекст и воркер окончательно "задыхается в луже"
— Первично запускаем работягу в отдельном потоке
— Если ловим ивент от тикера: производим отмену контекста, переназначаем этот же контекст и функцию отмены, сбрасываем таймер, и запускаем нового воркера в отдельном потоке
— В случае, когда из стокового потока нам пришло "сердцебиение" мы просто сбрасываем таймер и движемся дальше!
Стоит отметить, что воркеры должны "реагировать" на переданный контекст. Без этого мы получим утечку потоков и черт его знает, чем нам это грозит (профилированием и
устранением проблемы, которой можно было бы и избежать)
Таким образом, мы получаем более надежный Fan In, где все источники данных контролируемы и восстанавливаемы при зависаниях
Playground пример
Статью писали с Дашей: @dariasroom
Stay tuned 😏