Skip to the content.

Talk about the three highs in the integration of flink streaming and batching


Talk about the three highs in the integration of flink streaming and batching


     The “three highs” mentioned here are not “three hights”(😂) Oh,there have always been three pursuit goals in the iterative history of software development: high performance, high concurrency, and higt availability, commonly known as three hights. The there are both different and related. From the perspective of software development history, I personally think that the development process is high performance ( Single core) -> high concurrency (multi-core) -> high availability (multi-machine).
     Of course, it is high performance first. From the very old DOS as as single-user operating system, to the later Window98 as a multi-tasking system (actually a single process), and later UNIX supports a multi-user multi-tasking operating system (the real meaning of multi-process, multi-threaded system),the purpose is how to process the program with the highest performance in the case of a CPU.
     The second is high concurrency. Of course, what needs to the broken is the processing limit of a single machine. If the memory of CPU of a single machine is insufficient, the hardware configuration os a single machine can be increased by vertical expansion to maximize concurrency, and single-core becomes multi-core, so that parallelism can be achieved at the same time. Multiple tasks, thereby speeding up the overall thime cost of processing.
     Then when the performance of the single-machine processing reaches a certain bottleneck, it must scale horizontally, that is, to develop and distributed involves high availability, that is, the possibility of a single node hanging up must be considered. The industry’s usual middleware deployment 3 nodes, which is designed for high availability.

Talk about the three highs in the integration of flink streaming and batching Flink architecture and process

All distributed RPC communication in Flink relies on the Akka framework (a high-performance, fault-tolerant distributed Concurrent Application Framework).

High Performance

Distributed Cache

     In batch computing, most of the data to be processed comes from external files, which may come from systems like HDFS or S3 file systems in AWS, but Flink is not like MapReduce In the same way, the calculation can be performed with the location of the data, so in most cases, the network frequently copies files. Therefore, for some frequently used files, you can use a distributed cache mechanism provided by Flink internally, Placing it in the local memory of each computing node instance allows users to easily read the local file in parallel functions, and put it in the taskmanager node to prevent tasks from being repeatedly pulled. The working mechanism of this cache is The program registers a file or directory (local or remote file system, such as hdfs or s3), registers the cache file with the ExecutionEnvironment and gives it a name. Then when the program is executed, Flink automatically copies the file or directory to the local file system of all taskmanager nodes, which will only be executed once. Users can search for the file or directory by this specified name, and then access it from the local file system of the taskmanager node. In fact The distributed cache is equivalent to the broadcast mode in the message queue, which broadcasts a variable to all taskmanagers, except that a file is broadcast here. This can effectively avoid reading some files and having to obtain them through a remote network The situation of the file content, thereby improving the execution efficiency of the entire task.

Network Buffer

     Network Buffer is the Buffer used in network transmission (actually non-network transmission will also be used). Students who understand the Flink network stack should know better that Flink transmits the upstream and downstream tasks through the network. The design will be similar to the producer-consumer model. Without this buffer, the producer or consumer will spend a lot of time waiting for the downstream to get data and the upstream to send data. With this buffer, the producer and consumer In theory, the jitter of one party in a short time will not have much impact on the data processing of the other party.

Talk about the three highs in the integration of flink streaming and batching

There are three situations in Flink where the Netty server can consume the cache:

Let’s about what back pressure is, as shown below:

Talk about the three highs in the integration of flink streaming and batching

     Under normal circumstances, if the message processing speed (Receiver) >= the message sending speed (Sender), there will be no message congestion. The system runs smoothly, but when the message is sent too fast, the message is received too slowly, resulting in message congestion. It can automatically reduce the speed of message sending, which is the back pressure mechanism.

    How does Flink trade off between throughput and latency? In a streaming system, if the speed of downstream consumption cannot keep up with the speed of upstream production data, this phenomenon is called back pressure .When backpressure occurs, it is reasonable to limit the speed of upstream producers so that the downstream speed can keep up with the upstream speed. Backpressure will increase the data delay of stream processing jobs, and will also affect Checkpoint. Flink naturally supports streaming It can process one piece of data every time it comes, instead of a complete micro-batch like Spark Streaming. However, in order to improve throughput, the default Flink used does not process one piece of data every time. How to control it? Flink uses an efficient and bounded distributed blocking queue.

Talk about the three highs in the integration of flink streaming and batching

    For example, after the upstream Subtask S2 sends data, there are still 4 Buffers that are backlogged, then the sent data and Backlog size = 4 will be sent to the downstream Subtask S5. After receiving the data, the downstream knows There is a backlog of 4 Buffers in the upstream, so apply for Buffers from the Buffer Pool. Due to the limited capacity, the downstream InputChannel currently has only 2 Buffer spaces. Therefore, Subtask S6 will feedback Channel Credit = 2 to the upstream Subtask S2. Then the upstream will only give at most the next time. The data of 2 Buffers is sent downstream, so that the data sent upstream each time is the amount of data that the Buffer of the downstream InputChannel can bear. Therefore, through this feedback strategy, it is ensured that data will not accumulate in the shared Netty and TCP layers. Affects other Subtask communications.

How to locate back pressure:

    For memory management, Flink implements its own independent memory mechanism (out-of-heap memory), which has the advantage of minimizing the impact of JVM GC on the system. The Java ecosystem provides many serialization frameworks : Java serialization, Kryo, Apache Avro, etc. But Flink implements its own serialization framework. Flink converts all data objects into binary and stores them in memory through serialization and deserialization. While reducing the size of data storage, it can More effective use of memory space, reducing performance degradation or special task exceptions caused by GC, so Flink will be more stable than other distributed frameworks sharing JVM memory, and will not affect the entire application due to JVM GC and other problems. Operating status.

High Concurrency

Akka Framework

    As we mentioned earlier, all distributed RPC communication in Flink relies on the Akka framework, the implementation of Akka distributed communication adopted by Flink from version 0.9. With Akka, all Remote Procedure Calls (RPC) are implemented as asynchronous messages. This mainly affects the three components JobManager, TaskManager and JobClient. In the future, it is likely that more components will be converted into an actor, allowing them to send and process asynchronous messages.

    Akka main features:

Low Latency

     This basically also uses a bounded distributed blocking queue, that is, using a backpressure mechanism to achieve low latency and improve response speed, thereby increasing the number of concurrency;

    When it comes to concurrent execution, we have to talk about slot and parallelism in Flink. Let’s first understand the relationship between slot and parallelism

     There are two Task Managers in the image below, each with 2 slots, so that the maximum parallelism of our operators can reach 4, and 1 to multiple subtasks can be executed in the same slot.

Talk about the three highs in the integration of flink streaming and batching

slot refers to the concurrent execution capability of taskmanager

    As shown in the figure below: taskmanager.numberOfTaskSlots:3; That is, each taskmanager is allocated 3 TaskSlots, and 4 taskmanagers have a total of 12 TaskSlots.

Talk about the three highs in the integration of flink streaming and batching

parallelism refers to the concurrency capability actually used by taskmanager

     As shown in the figure below: parallelism.default:1; That is, the default parallelism of the running program is 1, only 1 of the 12 TaskSlots are used, and 11 are idle. Setting the appropriate parallelism can improve efficiency.

Talk about the three highs in the integration of flink streaming and batching

The above two concepts can be understood as follows:

We try not to set it from the system as much as possible, but to set it internally for different tasks. So the precautionary priority for setting parallelism is: Operator level > Runtime environment level > Client level > System level

High Availability

JobManager high availability

     Currently Flink high availability can support Standalone Cluster and Yarn Cluster two cluster modes, and Flink high availability is mainly aimed at JobManager, because JobManager is the management node of the entire cluster, responsible for task scheduling, Task allocation, resource management, etc. If there is a problem with the JobManager, the new job will not be submitted, and the existing jobs will all fail. Therefore, the high availability guarantee of the JobManager is particularly important. Flink does not enable the JobManager high availability by default. Configurable, (Leave a small question, what about Kubernetes!).

Standalone Cluster

     In Zookeeper, and through Zookeepe to complete the election of the JobManager Leader. There will be multiple JobManagers in the Standalone cluster at the same time , but only one provides services, and the others are in the Standby state. When the Active JobManager loses connection (node ​​disappears or network timeout, etc.), Zookeeper will automatically elect a new JobManager from the other Standby lists to take over the Flink cluster and provide services.

Talk about the three highs in the integration of flink streaming and batching

Yarn Cluster

     The high availability of the Flink Yarn Session cluster mode relies on the assistance of Yarn, mainly because Yarn itself has a certain fault tolerance guarantee for the applications running on it. The Flink On Yarn mode is actually a The JobManager is executed in the container where the ApplicationMaster is located, and Yarn will not start multiple JobManagers (similar to the zookeeper mechanism), but it does not decide which Standby becomes the Leader through zookeeper election, but ensures the high availability of the JobManager by restarting.

Checkpoints and Savepoints mechanism

     We all know that independent components need high availability, and tasks also need high availability, such as abnormal situations or machine expansion and upgrades. The computing state of the task can achieve semantic guarantee or a similar transaction guarantee mechanism. Flink provides two methods for tasks, one is automatic (Checkpoints) and manual (Savepoints). Savepoint different from a Checkpoint?.

     Checkpoints means save points, which means that Flink will Checkout barriers (demarcation points) are generated at intervals on the input data set or in the process, and the data in the interval time is divided into corresponding save points through the barrier, so that when an abnormal situation occurs in the application, it can start from the last time. The state before the task is restored in the snapshot, but the space size of the current state is directly related to the application. If the current computing task is large, the saving time will be very long, and it will also affect the throughput. Supports Memory, FileSystem, RocksDB, but the data is usually stored on the JobManager node or HDFS, the performance difference is Memory>FileSystem>RocksDB (the worst performance), and three settings (global, task, instance) levels are supported.

As shown, there are a total of 4 savepoints:

The new data will not trigger the scheduled task to save the Checkpoint because the time has not yet come.

Talk about the three highs in the integration of flink streaming and batching” Checkpoint mechanism

     Savepoints is actually a special implementation of checkpoints. The Checkpoints mechanism is still used, except that Savepoints are manually triggered by the user and persist the data to the specified storage path. The main purpose is the status data in the process of system upgrade or cluster maintenance, avoiding operation and maintenance due to downtime or normal termination, etc. The system cannot be restored to the original computing state due to the operation.

Conclusion

    This article mainly analyzes and discusses some technologies and mechanisms of Flink in stream computing. Flink is the only set of distribution in the open source community that integrates high throughput, low latency and high performance Streaming data processing framework. Like Apache Spark, it can only take into account the characteristics of high throughput and high performance, mainly because there is no guarantee of low latency in Spark Streaming stream computing (because Spark Streaming does not support low latency); while streaming The computing framework Apache Storm can only support low-latency and high-performance features, but it cannot meet the requirements of high throughput (the framework itself and semantic logic); while satisfying the three goals of high throughput, low latency, and high performance is critical to the distributed stream computing framework is very important.


back