Ketersediaan Tinggi Kafka (dalam)
Berdasarkan artikel sebelumnya, artikel ini menjelaskan mekanisme HA Kafka secara lebih mendalam, terutama menjelaskan berbagai skenario yang terkait dengan HA, seperti Broker failover, Controller failover, Topic creation / deletion, Broker startup, Follower fetch data from Leader dan prosedur pemrosesan terperinci lainnya. .
Pemrosesan Kegagalan Broker oleh Pengontrol
Pengontrol mendaftarkan Watch pada node / brokers / id dari Zookeeper. Setelah ada Broker down (waktu henti digunakan dalam artikel ini untuk mewakili situasi apa pun yang menurut Kafka adalah Broker die, termasuk namun tidak terbatas pada kegagalan daya mesin, tidak tersedianya jaringan, Stop The World yang disebabkan oleh GC, proses crash, dll.), Yang sesuai dengan Zookeeper Znode akan dihapus secara otomatis, Zookeeper akan mengaktifkan Pengontrol Jam yang terdaftar, Pengontrol bisa mendapatkan daftar Broker terbaru yang masih hidup. Controller memutuskan untuk set_p, yang berisi semua Partisi pada semua Broker yang down. Untuk setiap Partisi di set_p: Baca ISR Partisi saat ini dari / brokers / topics // partisi // state. Tentukan pemimpin baru dari Partisi. Jika setidaknya satu replika di ISR saat ini bertahan, salah satunya dipilih sebagai pemimpin baru, dan ISR baru berisi semua replika yang masih ada di ISR saat ini. Jika tidak, pilih Replika yang masih hidup di Partisi sebagai Leader dan ISR baru (mungkin ada potensi kehilangan data dalam skenario ini). Jika semua replika partisi tidak aktif, atur pemimpin baru ke -1. Tuliskan pemimpin baru, ISR, leader_epoch baru, dan controller_epoch ke / brokers / topics // partisi // state. Perhatikan bahwa operasi ini hanya akan dijalankan jika versi Pengontrol tidak berubah dari 3.1 ke 3.3, jika tidak, lewati ke 3.1. Kirim perintah LeaderAndISRRequest ke broker yang terkait dengan set_p secara langsung melalui RPC. Pengontrol dapat mengirim banyak perintah dalam satu operasi RPC untuk meningkatkan efisiensi. Diagram urutan failover Broker ditampilkan di bawah ini.
Struktur LeaderAndIsrRequest adalah sebagai berikut
Struktur LeaderAndIsrResponse adalah sebagai berikut
Buat / Hapus Topik
Controller mendaftarkan Watch pada node / brokers / topik dari Zookeeper.Setelah sebuah topik dibuat atau dihapus, Controller akan mendapatkan alokasi partisi / replika dari topik yang baru dibuat / dihapus melalui Watch. Untuk operasi hapus topik, alat topik akan menyimpan nama topik di / admin / delete_topics. Jika delete.topic.enable bernilai true, Watch yang didaftarkan oleh Controller pada / admin / delete_topics dipecat, dan Controller mengirimkan StopReplicaRequest ke Broker terkait melalui callback; jika false, Controller tidak akan mendaftarkan Watch on / admin / delete_topics, dan juga Tidak akan ada respons ke acara tersebut, dan operasi Topik hanya direkam tetapi tidak dijalankan. Untuk operasi Buat Topik, Pengontrol membaca daftar semua Broker yang saat ini tersedia dari / brokers / id, untuk setiap Partisi di set_p: Pilih Broker yang tersedia dari semua Replika (disebut AR) yang ditugaskan ke Partisi sebagai Leader baru, dan atur AR sebagai ISR baru (karena topiknya baru dibuat, jadi semua Replika di AR tidak memiliki Data, dapat dianggap bahwa semuanya disinkronkan, yaitu, semuanya ada di ISR, dan Replika apa pun dapat digunakan sebagai Leader) Tulis pemimpin baru dan ISR ke / brokers / topik // partisi / Kirim LeaderAndISRRequest langsung ke Broker terkait melalui RPC. Buat diagram urutan Topik seperti yang ditunjukkan di bawah ini.
Proses permintaan tanggapan broker
Broker menerima dan menanggapi berbagai permintaan melalui kafka.network.SocketServer dan modul terkait. Seluruh modul komunikasi jaringan dikembangkan berdasarkan Java NIO dan mengadopsi model Reaktor, yang mencakup 1 Akseptor yang bertanggung jawab untuk menerima permintaan pelanggan, N Prosesor yang bertanggung jawab untuk membaca dan menulis data, dan M Handler yang memproses logika bisnis.
Tanggung jawab utama Penerima adalah mendengarkan dan menerima permintaan koneksi dari klien (pemrakarsa permintaan, termasuk namun tidak terbatas pada Produsen, Konsumen, Pengontrol, Alat Admin), dan membuat saluran transmisi data dengan klien, dan kemudian menentukan Prosesor untuk klien, Pada titik ini, tugas yang diminta oleh klien telah selesai, dan dapat menanggapi permintaan koneksi klien berikutnya. Kode intinya adalah sebagai berikut.
Prosesor terutama bertanggung jawab untuk membaca data dari klien dan mengembalikan respons ke klien. Prosesor tidak memproses logika bisnis tertentu itu sendiri, dan memelihara antrian secara internal untuk menyimpan semua SocketChannels yang dialokasikan untuknya. Metode yang dijalankan prosesor secara siklis akan mengeluarkan SocketChannel baru dari antrian dan mendaftarkan SelectionKey.OP_READ-nya pada selektor, dan kemudian secara siklis memproses siap baca (permintaan) dan tulis (respons). Setelah prosesor membaca data, itu merangkumnya menjadi objek Request dan menyerahkannya ke RequestChannel.
RequestChannel adalah tempat di mana Processor dan KafkaRequestHandler bertukar data. Ini berisi permintaan antrian requestQueue untuk menyimpan Permintaan yang ditambahkan oleh Prosesor. KafkaRequestHandler akan mengambil Permintaan darinya untuk diproses; itu juga berisi respondQueue untuk menyimpan KafkaRequestHandler dan mengembalikannya ke pelanggan setelah memproses Permintaan. Tanggapan di akhir.
Prosesor akan secara berurutan mengambil Responses yang disimpan di responseQueue di requestChannel melalui metode processNewResponses, dan mendaftarkan acara SelectionKey.OP_WRITE yang sesuai ke selektor. Saat metode pemilihan pemilih kembali, untuk saluran yang dapat ditulis yang terdeteksi, panggil metode tulis untuk mengembalikan Respons ke klien.
KafkaRequestHandler secara siklis mengambil Permintaan dari RequestChannel dan menyerahkannya ke kafka.server.KafkaApis untuk memproses logika bisnis tertentu.
Proses respons LeaderAndIsrRequest
Untuk LeaderAndIsrRequest yang diterima, Broker terutama memproses beLeaderOrFollower dari ReplicaManager, dan prosesnya adalah sebagai berikut:
Jika controllerEpoch dalam permintaan kurang dari controllerEpoch terbaru, ia akan langsung mengembalikan ErrorMapping.StaleControllerEpochCode. Untuk setiap elemen di partitionStateInfos dalam permintaan, yaitu ((topic, partitionId), partitionStateInfo): Jika waktu pemimpin di partitionStateInfo lebih besar dari waktu pemimpin partisi yang sesuai dengan (topic, partitionId) yang disimpan di Manajer Replika saat ini, maka: Jika brokerid saat ini (atau replika id) ada di partitionStateInfo, simpan partisi dan partitionStateInfo di HashMap bernama partitionState Jika tidak, itu berarti bahwa Pialang tidak ada dalam daftar Replika yang dialokasikan oleh Partisi, dan informasinya dicatat di log Jika tidak, kode Kesalahan yang sesuai (ErrorMapping.StaleLeaderEpochCode) akan disimpan di Respon Filter semua catatan di partitionState di mana Leader dan Broker ID saat ini sama dan simpan di partitionsTobeLeader, dan simpan catatan lain di partisiToBeFollower. Jika partitionsTobeLeader tidak kosong, jalankan makeLeaders. Jika partitionsToBeFollower tidak kosong, jalankan metode makeFollowers di atasnya. Jika utas highwatermak belum dimulai, mulai dan setel hwThreadInitialized ke true. Tutup semua Pengambil dalam kondisi Idle. Proses pemrosesan LeaderAndIsrRequest ditunjukkan pada gambar berikut
Proses startup broker
Setelah Broker dimulai, node turunan sementara (node Ephemeral) dibuat di bawah Zookeeper / brokers / idszonde sesuai dengan ID-nya. Setelah pembuatan berhasil, ReplicaStateMachine Pengontrol mendaftarkan Broker Change Watch di atasnya dan akan diaktifkan, sehingga menyelesaikan hal berikut dengan memanggil kembali metode KafkaController.onBrokerStartup langkah:
1. Kirim UpdateMetadataRequest ke semua Broker yang baru dimulai, yang didefinisikan sebagai berikut
2. Setel semua Replika pada Broker yang baru dimulai ke status OnlineReplica, dan Broker ini akan memulai utas watermark tinggi untuk Partisi ini.
3. Picu OnlinePartitionStateChange melalui partitionStateMachine.
Pengontrol Kegagalan
Pengontrol juga membutuhkan Failover. Setiap Broker akan mendaftarkan Watch on the Controller Path (/ controller). Ketika Pengontrol saat ini gagal, Jalur Pengontrol yang sesuai akan secara otomatis menghilang (karena ini adalah Node Efemeral). Pada saat ini, Arloji diaktifkan, dan semua Broker "hidup" akan menjalankan Pengontrol baru (buat Jalur Pengontrol baru), tetapi Hanya satu kampanye yang akan berhasil (ini dijamin oleh Zookeeper). Pemenang pemilihan adalah Pemimpin baru, dan yang kalah dalam pemilihan akan mendaftar ulang Awasi di Jalur Pengontrol baru. Karena Zookeeper's Watch hanya sekali, itu menjadi tidak valid setelah ditembakkan sekali, jadi perlu didaftarkan ulang.
Setelah Broker berhasil memilih untuk menjadi Pengendali baru, ini akan memicu metode KafkaController.onControllerFailover, dan menyelesaikan operasi berikut dalam metode ini:
Membaca dan menambah Waktu Pengontrol. Daftarkan Watch di ReassignedPartitions Patch (/ admin / reassign_partitions). Daftarkan Watch di Jalur PreferredReplicaElection (/ admin / prefer_replica_election). Daftarkan Watch di Broker Topik Patch (/ broker / topik) melalui partitionStateMachine. Jika delete.topic.enable disetel ke true (nilai defaultnya salah), partitionStateMachine akan mendaftarkan Watch on Delete Topic Patch (/ admin / delete_topics). Daftarkan Watch di Broker Id Patch (/ brokers / id) melalui replicaStateMachine. Inisialisasi objek ControllerContext, setel semua topik saat ini, daftar Broker yang "hidup", Pemimpin dan ISR dari semua Partisi, dll. Mulai replicaStateMachine dan partitionStateMachine. Setel status brokerState ke RunningAsController. Kirim informasi Kepemimpinan setiap Partisi ke semua Broker yang "hidup". Jika auto.leader.rebalance.enable dikonfigurasi sebagai true (nilai default adalah true), thread partisi-rebalance akan dimulai. Jika delete.topic.enable disetel ke true dan ada nilai di Delete Topic Patch (/ admin / delete_topics), topik terkait akan dihapus. Distribusi ulang partisi
Setelah alat manajemen mengeluarkan permintaan penetapan ulang Partisi, alat tersebut akan menulis informasi terkait ke / admin / reassign_partitions, dan operasi ini akan memicu ReassignedPartitionsIsrChangeListener, untuk menyelesaikan operasi berikut dengan menjalankan fungsi callback KafkaController.onPartitionReassignment:
Perbarui AR (Replika yang Ditugaskan Saat Ini) di Zookeeper ke OAR (Daftar asli replika untuk partisi) + RAR (replika yang ditetapkan ulang). Paksa untuk memperbarui periode pemimpin di Zookeeper, dan kirim LeaderAndIsrRequest ke setiap Replika di AR. Setel Replica di RAR-OAR ke status NewReplica. Tunggu sampai semua replika di RAR disinkronkan dengan pemimpinnya. Setel semua Replika dalam RAR ke status OnlineReplica. Setel AR di Cache ke RAR. Jika pemimpin tidak ada dalam RAR, pemimpin baru dipilih kembali dari RAR dan LeaderAndIsrRequest dikirim. Jika pemimpin baru tidak dipilih dari RAR, periode pemimpin di Penjaga Zookeeper harus ditingkatkan. Setel semua Replika di OAR-RAR ke status OfflineReplica, prosesnya terdiri dari dua bagian. Pertama, hapus OAR-RAR di ISR di Zookeeper dan kirim LeaderAndIsrRequest ke Leader untuk memberi tahu bahwa Replika ini telah dihapus dari ISR; kedua, kirim StopReplicaRequest ke Replika di OAR-RAR agar tidak lagi dialokasikan ke Partisi Replika. Setel semua Replika dalam status OAR-RAR ke NonExistentReplica untuk menghapusnya dari disk. Setel AR di Zookeeper ke RAR. Hapus / admin / reassign_partition. catatan: Langkah terakhir adalah memperbarui AR di Zookeeper, karena ini adalah satu-satunya tempat di mana AR disimpan secara persisten. Jika Pengontrol macet sebelum langkah ini, Pengontrol baru masih dapat melanjutkan proses.
Berikut ini adalah kasus redistribusi partisi, OAR = {1, 2, 3}, RAR = {4, 5, 6}, jalur AR dan Leader / ISR di Zookeeper selama proses redistribusi partisi adalah sebagai berikut
ARLeader / ISRStep1,2,31 / 1,2,3 kondisi awal1,2,3,4,5,61 / 1,2,3step 21,2,3,4,5,61 / 1,2,3,4, 5,6 langkah 41,2,3,4,5,61 / 1,2,3,4,5,6 langkah 71,2,3,4,5,64 / 4,5,6 langkah 84,5,64 / 4 , 5,6 langkah 10
Follower mendapatkan data dari Leader Fetch
Follower mendapatkan pesan tersebut dengan mengirimkan FetchRequest ke Leader, struktur FetchRequest adalah sebagai berikut
Seperti yang dapat dilihat dari struktur FetchRequest, setiap permintaan Fetch harus menentukan waktu tunggu maksimum dan jumlah byte minimum yang akan diperoleh, serta Peta yang terdiri dari TopicAndPartition dan PartitionFetchInfo. Faktanya, data dari Leader dari Follower dan Pengambilan dari Konsumen dari Broker semuanya dilakukan melalui permintaan FetchRequest, jadi dalam struktur FetchRequest, salah satu kolomnya adalah clientID, dan nilai defaultnya adalah ConsumerConfig.DefaultClientId.
Setelah Pimpinan menerima permintaan Ambil, Kafka menanggapi permintaan tersebut melalui KafkaApis.handleFetchRequest. Proses tanggapannya adalah sebagai berikut:
ReplicaManager membaca data dan menyimpannya di dataRead sesuai dengan permintaan.
Jika permintaan berasal dari Follower, perbarui LEO yang sesuai (offset akhir log) dan Tanda Air Tinggi dari Partisi yang sesuai Hitung panjang pesan yang dapat dibaca (dalam byte) menurut dataRead dan simpan dalam bytesReadable. Jika salah satu dari 4 ketentuan berikut ini terpenuhi, data yang sesuai akan segera dikembalikan Permintaan ambil tidak ingin menunggu, yaitu fetchRequest.macWait < = 0 Permintaan Fetch tidak mengharuskan pesan diambil, yaitu fetchRequest.numPartitions < = 0, yaitu requestInfo kosong Ada cukup data untuk dikembalikan, yaitu bytesReadable > = fetchRequest.minBytes Pengecualian terjadi saat membaca data Jika 4 ketentuan di atas tidak terpenuhi, FetchRequest tidak akan langsung dikembalikan, dan permintaan akan dikemas sebagai DelayedFetch. Periksa apakah DeplayedFetch puas, jika puas, kembalikan permintaan, jika tidak tambahkan permintaan ke daftar Watch Leader mengembalikan pesan ke Follower berupa FetchResponse, struktur FetchResponse adalah sebagai berikut
-
- Memecahkan kemiringan data Spark (3) Gunakan awalan acak untuk menyebarkan dan memiringkan kunci
-
- Pada Pertemuan Pembacaan dan Editor Musim Gugur Nanjing, orang-orang besar di surat kabar komputer memanggil Anda untuk "menghadapi basis"!
-
- Xiaomi Mix2 akhirnya mengantarkan pembaruan versi stabil, dan juga mendukung aplikasi buka kunci wajah
-
- Perpustakaan deteksi wajah open source dari profesor Universitas Shenzhen dikenal sebagai yang tercepat dalam sejarah
-
- Dominasi lagi! Skor komposit Huawei P30 Pro DXO 112 poin
-
- RTX 2060 spesifikasi resmi, kinerja, harga eksposur penuh: di luar GTX 1070 Ti
-
- JD 818 Mobile Phone Festival: Kedua ponsel ini direkomendasikan oleh banyak penggemar!
-
- Memecahkan kemiringan data Spark (2) Gabung sisi peta, bukan Kurangi
-
- Lenovo Z5 meningkatkan Android P, pengguna Xiaomi Meizu tidak bisa berkata-kata
-
- Bisakah WeChat dibakar setelah membaca? Fungsi ini sangat tersembunyi
-
- Generasi baru dari eksposur mobil nyata Mazda 3: yang paling tampan dalam sejarah desain jiwa baru
-
- Memecahkan kemiringan data Spark (1) Mendistribusikan kunci berbeda dari tugas yang sama