Apache Flink: динамическое определение выходного топика в Kafka

Всем привет, меня зовут Александр Бобряков. Я техлид в команде МТС Аналитики, занимаюсь Real-Time обработкой данных. Мы начали использовать фреймворк Apache Flink, и я решил поделиться на Хабре своим опытом внедрения этой технологии в цикле статей.

В предыдущей статье — «Apache Flink. Как работает дедупликация данных в потоке Kafka-to-Kafka?» — я рассказывал про построение пайплайна Kafka-to-Kafka с промежуточным разделением потока и дедупликацией событий. Также разобрались, что такое состояние оператора и зачем оно нужно.

В этой статье добавим возможность динамического определения топика в Kafka для каждого события, куда его нужно записать.

aec7069d5fd3b12c34201feeb9363fe8.jpgСписок моих постов про Flink

Весь разбираемый исходный код можно найти в репозитории. Этот пост соответствует релизной ветке с названием release/3_WrappedSinkMessage.

Оглавление статьи

Постановка задачи динамически определяемого топика Kafka Sink

В предыдущей статье мы построили основной пайплайн, но все итоговые события ProductMessage попадали в единственный общий топик. Теперь немного усложним логику. Напомню, что входное сообщение ClickMessage имеет вид:

@Data
@Builder
@Jacksonized
@JsonIgnoreProperties(ignoreUnknown = true)
public class ClickMessage {
   @JsonProperty(required = true)
   @JsonPropertyDescription("User id")
   private UUID userId;

   @JsonPropertyDescription("Clicked object")
   private String object;

   @JsonPropertyDescription("User platform")
   @JsonDeserialize(using = Platform.Deserializer.class)
   private Platform platform;

   @JsonPropertyDescription("Product name")
   private String productName;

   @JsonPropertyDescription("Product topic")
   private String productTopic;

   @JsonProperty(required = true)
   @JsonPropertyDescription("Timestamp")
   private Long timestamp;

   @JsonPropertyDescription("Additional data")
   private Map data;
}

Теперь бизнес-клиентам захотелось бы записывать соответствующее сообщение ProductMessage (полученное преобразованием над ClickMessage) в топик, указанный в поле ClickMessage.productTopic, чтобы каждый продукт (клиент) мог подключаться и читать сообщения только для себя.

Статическая реализация определения топика Kafka Sink

Также в предыдущей части мы реализовали Kafka Sink. Давайте вспомним как он выглядел (за подробными объяснениями кода обратитесь к предыдущей статье), и доработаем его:

@Component
@RequiredArgsConstructor
public class ProductMessageKafkaSinkProvider implements SinkProvider {
   private final KafkaProperties kafkaProperties;
   private final SerializationSchema serializationProductMessageSchema;

   @Override
   public Sink createSink() {
       return KafkaSink.builder()
                  .setBootstrapServers(kafkaProperties.getBootstrapServers())
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                                           .setTopic(kafkaProperties.getTopics().getProductTopic())
                                           .setValueSerializationSchema(serializationProductMessageSchema)
                                           .build())
                  .setDeliveryGuarantee(NONE)
                  .build();
   }
}

В дополнение к этому Kafka Sink предоставляет класс-сериализатор, который отвечает за способ сериализации сообщения ProductMessage перед отправкой в топик Kafka с помощью Jackson:

@Component
@RequiredArgsConstructor
class ProductMessageSerializationSchema implements SerializationSchema {
   private static final long serialVersionUID = 1;

   private transient ObjectMapper objectMapper;

   @Override
   public void open(InitializationContext context) {
       objectMapper = createObjectMapper();
   }

   @Override
   @SneakyThrows
   public byte[] serialize(ProductMessage element) {
       return objectMapper.writeValueAsBytes(element);
   }
}

В Kafka Sink мы статически определяем выходной топик setTopic (). Он задан в наших application.yml настройках. Теперь это поведение нужно изменить — Kafka Sink должен динамически определить топик, в который нужно записать событие ProdcutMessage.

Для начала нам поможет метод setTopicSelector () вместо предыдущего setTopic (). Реализация TopicSelector динамически определяет  топик на основе текущего записываемого события, определяемого в лямбда-функции.

/**
 * Selects a topic for the incoming record.
 *
 * @param  type of the incoming record
 */
@PublicEvolving
public interface TopicSelector extends Function, Serializable {}

Проблема в том, что в выходном ProductMessage нет информации об этом топике.  А топик хранится в поле входного сообщения ClickMessage.productTopic. Значит — нам нужна обертка, которая будет содержать сам ProductMessage и важную мета-информацию.

WrappedSinkMessage — обертка над отправляемым событием

Чтобы сохранить информацию о топике и самом событии сделаем обертку WrappedSinMessage:

@Getter
@EqualsAndHashCode
@RequiredArgsConstructor
public class WrappedSinkMessage implements Serializable {
   private static final long serialVersionUID = 1L;

   private final Meta meta;
   private final T message;

   @Getter
   @EqualsAndHashCode
   @RequiredArgsConstructor
   public static class Meta implements Serializable {
       private static final long serialVersionUID = 1L;

       public final String targetTopicName;
   }
}

В ней мы храним само целевое сообщение ProductMessage для отправки в Sink, а также дополнительную meta-информацию — в нашем случае только целевой топик targetTopicName. Поэтому перед непосредственным применением sinkTo () в пайплайне нам нужно будет привести входное сообщение ClickMessage к этой обертке. В поле метаданных можно также хранить любую нужную вам информацию. Например, мы добавляем в него данные, требующиеся для метрик, такие как latency обработки, типы обрабатываемых событий и так далее.

Забегая вперед, скажу, что такой подход — обертка для события еще на старте — очень хорошо себя зарекомендовал. Потому что во многих задачах нужно «нести» информацию о текущем событии до самого конца пайплайна, изменяя исходное событие сколько угодно раз

Динамическая реализация определения топика Kafka Sink

Теперь нужно изменить Kafka Sink и алгоритм сериализации сообщения, ведь к нам теперь поступает WrappedSinMessage с целевымProductMessage внутри. Взглянем на изменение сериализатора:

@Component
@RequiredArgsConstructor
class WrappedProductMessageSerializationSchema implements SerializationSchema> {
   private static final long serialVersionUID = 1;

   private transient ObjectMapper objectMapper;

   @Override
   public void open(InitializationContext context) {
       objectMapper = createObjectMapper();
   }

   @Override
   @SneakyThrows
   public byte[] serialize(WrappedSinkMessage element) {
       return objectMapper.writeValueAsBytes(element.getMessage());
   }
}

В классе изменилась только сериализация непосредственно внутреннего ProductMessage в строчке element.getMessage (). Тут все довольно прозрачно. А вот в самом создании Kafka Sink мы имеем доступ к WrappedSinkMessage, из которого легко получить сам целевой топик:

@Component
@RequiredArgsConstructor
public class WrappedProductMessageKafkaSinkProvider implements SinkProvider> {
   private final KafkaProperties kafkaProperties;
   private final SerializationSchema> serializationProductMessageSchema;

   @Override
   public Sink> createSink() {
       return KafkaSink.>builder()
                  .setBootstrapServers(kafkaProperties.getBootstrapServers())
                  .setRecordSerializer(KafkaRecordSerializationSchema.>builder()
                                           .setTopicSelector(wrappedMessage -> wrappedMessage.getMeta().getTargetTopicName())
                                           .setValueSerializationSchema(serializationProductMessageSchema)
                                           .build())
                  .setDeliveryGuarantee(NONE)
                  .build();
   }
}

Основное изменение только в строке с методом setTopicSelector () с приведением остальных типов из ProductMessage к WrappedSinkMessage. Значит, можем  убрать в рамках репозитория настройку kafka.topics.product-topic.

Трансформация входного события к обертке

Итак, у нас был класс для трансформации входного ClickMessage в выходной ProductMessage. А сейчас нам нужно преобразовать ClickMessage в целевой WrappedSinkMessage. Для этого мы совсем немного дополним предыдущую реализацию:

@Slf4j
@RequiredArgsConstructor
public class ClickMessageToWrappedProductSinkMessageMapFunction implements FlatMapFunction> {
   private static final long serialVersionUID = 1L;

   @Override
   public void flatMap(ClickMessage clickMessage, Collector> out) {
       try {
           final var productMessage = ProductMessage.builder()
                                          .userId(clickMessage.getUserId())
                                          .productName(clickMessage.getProductName())
                                          .object(clickMessage.getObject())
                                          .platform(clickMessage.getPlatform())
                                          .timestamp(clickMessage.getTimestamp())
                                          .build();
           final var wrappedMessage = new WrappedSinkMessage<>(
               new WrappedSinkMessage.Meta(clickMessage.getProductTopic()),
               productMessage);
           out.collect(wrappedMessage);
       } catch (Exception e) {
           log.error("Error converting ClickMessage to ProductMessage", e);
       }
   }
}

Также в коде Flink Job ClickToProductJob необходимо теперь инжектить бин SinkProvider>.

Вывод

Вот мы и мы разобрались как динамически определять целевой топик для записи события в Kafka.

Теперь нужно все это протестировать. В следующей статье займемся Unit-тестированием всех операторов джобы. Описанные там подходы к тестированию будут применимы и в любых других приложениях, не связанных с Flink. Затем перейдем к тестированию всей Flink Job, а также E2E-тестированию.

Если остались вопросы — задавайте в комментариях!

© Habrahabr.ru