Buffer Pool Manager主要实现几个功能,向disk写新的page,从disk里fetch page,刷新page等,这个部分千万千万反复看视频和PPT,当对page进行内容操作的时候会对该page进行pin,跟PPT上所说的一样。
实现细节
Notice
page.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14
private: /** Zeroes out the data that is held within the page. */ inlinevoidResetMemory(){ memset(data_, OFFSET_PAGE_START, PAGE_SIZE); }
/** The actual data that is stored within a page. */ char data_[PAGE_SIZE]{}; /** The ID of this page. */ page_id_t page_id_ = INVALID_PAGE_ID; /** The pin count of this page. */ int pin_count_ = 0; /** True if the page is dirty, i.e. it is different from its corresponding page on disk. */ bool is_dirty_ = false; /** Page latch. */ ReaderWriterLatch rwlatch_;
/** Number of pages in the buffer pool. */ constsize_t pool_size_; /** How many instances are in the parallel BPM (if present, otherwise just 1 BPI) */ constuint32_t num_instances_ = 1; /** Index of this BPI in the parallel BPM (if present, otherwise just 0) */ constuint32_t instance_index_ = 0; /** Each BPI maintains its own counter for page_ids to hand out, must ensure they mod back to its instance_index_ */ std::atomic<page_id_t> next_page_id_ = instance_index_;
/** Array of buffer pool pages. */ Page *pages_; /** Pointer to the disk manager. */ DiskManager *disk_manager_ __attribute__((__unused__)); /** Pointer to the log manager. */ LogManager *log_manager_ __attribute__((__unused__)); /** Page table for keeping track of buffer pool pages. */ std::unordered_map<page_id_t, frame_id_t> page_table_; /** Replacer to find unpinned pages for replacement. */ Replacer *replacer_; /** List of free pages. */ std::list<frame_id_t> free_list_; /** This latch protects shared data structures. We recommend updating this comment to describe what it protects. */ std::mutex latch_;
disk_manager.h
1 2 3 4 5 6 7 8 9 10 11 12 13
/** * Write a page to the database file. * @param page_id id of the page * @param page_data raw page data */ voidWritePage(page_id_t page_id, constchar *page_data);
/** * Read a page from the database file. * @param page_id id of the page * @param[out] page_data output buffer */ voidReadPage(page_id_t page_id, char *page_data);
TODO 1 FlushPgImp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
boolBufferPoolManagerInstance::FlushPgImp(page_id_t page_id){ // Make sure you call DiskManager::WritePage! std::lock_guard<std::mutex> lock_guard(latch_); if (page_id == INVALID_PAGE_ID) { returnfalse; } auto item = page_table_.find(page_id); if (item == page_table_.end()) { returnfalse; }
Page *BufferPoolManagerInstance::NewPgImp(page_id_t *page_id){ // 0. Make sure you call AllocatePage! // 1. If all the pages in the buffer pool are pinned, return nullptr. // 2. Pick a victim page P from either the free list or the replacer. Always pick from the free list first. // 3. Update P's metadata, zero out memory and add P to the page table. // 4. Set the page ID output parameter. Return a pointer to P. std::lock_guard<std::mutex> lock_guard(latch_);
Page *BufferPoolManagerInstance::FetchPgImp(page_id_t page_id){ // 1. Search the page table for the requested page (P). // 1.1 If P exists, pin it and return it immediately. // 1.2 If P does not exist, find a replacement page (R) from either the free list or the replacer. // Note that pages are always found from the free list first. // 2. If R is dirty, write it back to the disk. // 3. Delete R from the page table and insert P. // 4. Update P's metadata, read in the page content from disk, and then return a pointer to P.
boolBufferPoolManagerInstance::DeletePgImp(page_id_t page_id){ // 0. Make sure you call DeallocatePage! // 1. Search the page table for the requested page (P). // 1. If P does not exist, return true. // 2. If P exists, but has a non-zero pin-count, return false. Someone is using the page. // 3. Otherwise, P can be deleted. Remove P from the page table, reset its metadata and return it to the free list.
std::lock_guard<std::mutex> lock_guard(latch_);
DeallocatePage(page_id);
auto item = page_table_.find(page_id); if (item == page_table_.end()) { returntrue; }
voidBufferPoolManagerInstance::ValidatePageId(constpage_id_t page_id)const{ assert(page_id % num_instances_ == instance_index_); // allocated pages mod back to this BPI }
通过观察上述的代码跟踪,可以分析得出,每次对page_id的分配就是对其进行hash取模来判断是存在哪个pool;然后为了达到并行,我们直接每个函数内操作该buffer pool instance内地数据就好了,其他的就是分配页儿的策略了,看如何达到比较好的性能,哈哈哈我不会,我就每次newpage的时候去每个instance挨个找,性能可能会极差。
ParallelBufferPoolManager::ParallelBufferPoolManager(size_t num_instances, size_t pool_size, DiskManager *disk_manager, LogManager *log_manager) { // Allocate and create individual BufferPoolManagerInstances std::lock_guard<std::mutex> lock_guard(parallel_buffer_pool_latch_); num_instance_ = num_instances; pool_size_ = pool_size; start_index_ = 0; for (size_t i = 0; i < num_instance_; ++i) { buffer_pool_manager_.push_back( newBufferPoolManagerInstance(pool_size_, num_instance_, i, disk_manager, log_manager)); } }
TODO 3 Destructor
1 2 3 4 5 6
// Update constructor to destruct all BufferPoolManagerInstances and deallocate any associated memory ParallelBufferPoolManager::~ParallelBufferPoolManager() { for (size_t i = 0; i < num_instance_; ++i) { delete buffer_pool_manager_[i]; } }
TODO 4 GetPoolSize
1 2 3 4
size_tParallelBufferPoolManager::GetPoolSize(){ // Get size of all BufferPoolManagerInstances return pool_size_ * num_instances_; }
TODO 5 GetBufferPoolManager
1 2 3 4 5
BufferPoolManager *ParallelBufferPoolManager::GetBufferPoolManager(page_id_t page_id){ // Get BufferPoolManager responsible for handling given page id. You can use this method in your other methods. return buffer_pool_manager_[page_id % num_instance_]; }
TODO 6 FetchPgImp
1 2 3 4
Page *ParallelBufferPoolManager::FetchPgImp(page_id_t page_id){ // Fetch page for page_id from responsible BufferPoolManagerInstance returnGetBufferPoolManager(page_id)->FetchPage(page_id); }
boolParallelBufferPoolManager::FlushPgImp(page_id_t page_id){ // Flush page_id from responsible BufferPoolManagerInstance returnGetBufferPoolManager(page_id)->FlushPage(page_id); }
TODO 9 NewPgImp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
Page *ParallelBufferPoolManager::NewPgImp(page_id_t *page_id){ // create new page. We will request page allocation in a round robin manner from the underlying // BufferPoolManagerInstances // 1. From a starting index of the BPMIs, call NewPageImpl until either 1) success and return 2) looped around to // starting index and return nullptr // 2. Bump the starting index (mod number of instances) to start search at a different BPMI each time this function // is called
std::lock_guard<std::mutex> lock_guard(parallel_buffer_pool_latch_); for (size_t i = start_index_; i < start_index_ + num_instance_; ++i) { BufferPoolManager *manager = buffer_pool_manager_[i % num_instance_]; Page *page = manager->NewPage(page_id); if (page != nullptr) { start_index_ = (i + 1) % num_instance_; return page; } } returnnullptr; }
TODO 10 DeletePgImp
1 2 3 4
boolParallelBufferPoolManager::DeletePgImp(page_id_t page_id){ // Delete page_id from responsible BufferPoolManagerInstance returnGetBufferPoolManager(page_id)->DeletePage(page_id); }
TODO 11 FlushAllPgsImp
1 2 3 4 5 6
voidParallelBufferPoolManager::FlushAllPgsImp(){ // flush all pages from all BufferPoolManagerInstances for (size_t i = 0; i < num_instance_; ++i) { buffer_pool_manager_[i]->FlushAllPages(); } }
Test
1 2
make parallel_buffer_pool_manager_test ./test/parallel_buffer_pool_manager_test
1 2
valgrind --leak-check=full --suppressions=../build_support/valgrind.supp ./test/parallel_buffer_pool_manager_test --gtest_filter=ParallelBufferPoolManagerTest.BinaryDataTest:ParallelBufferPoolManagerTest.Sample Test