網上有很多關于pos機刷卡連接超時, broker 文件刷盤機制的知識,也有很多人為大家解答關于pos機刷卡連接超時的問題,今天pos機之家(m.www690aa.com)為大家整理了關于這方面的知識,讓我們一起來看下吧!
本文目錄一覽:
pos機刷卡連接超時
RocketMQ的存儲與讀寫是基于JDK NIO的內存映射機制(MappedByteBuffer)的,消息存儲時首先將消息追加到內存中,再根據配置的刷盤策略在不同時間刷盤。
如果是同步刷盤,消息追加到內存后,將同步調用MappedByteBuffer的force()方法;如果是異步刷盤,在消息追加到內存后會立刻返回給消息發送端。RocketMQ使用一個單獨的線程按照某一個設定的頻率執行刷盤操作。通過在broker配置文件中配置flushDiskType來設定刷盤方式,可選值為ASYNC_FLUSH(異步刷盤)、SYNC_FLUSH(同步刷盤),默認為異步刷盤。
本節以CommitLog文件刷盤機制為例來剖析RocketMQ的刷盤機制,ConsumeQueue文件、Index文件刷盤的實現原理與CommitLog刷盤機制類似。
RocketMQ處理刷盤的實現方法為Commitlog#handleDiskFlush(),刷盤流程作為消息發送、消息存儲的 子流程。值得注意的是,Index文件的刷盤并不是采取定時刷盤機制,而是每更新一次Index文 件就會將上一次的改動寫入磁盤。
1. 刷盤策略在理解RocketMQ刷盤實現之前,先理解一下上圖展示的刷盤的2種實現的:
直接通過內存映射文件,通過flush刷新到磁盤當異步刷盤且啟用了對外內存池的時候,先write到writeBuffer,然后commit到FILEchannel,最后flush到磁盤CommitLog的asyncPutMessage方法中可以看到在寫入消息之后,調用了submitFlushrequest方法執行刷盤策略:
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) { ... // 獲取最后一個 MappedFile MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); ... try { ... // todo 往mappedFile追加消息 result = mappedFile.appendMessage(msg, this.appendMessageCallback); ... } finally { putMessageLock.unlock(); } ... // todo 消息首先進入pagecache,然后執行刷盤操作, CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg); ...}復制代碼
刷盤有兩種策略:
同步刷盤,表示消息寫入到內存之后需要立刻刷到磁盤文件中。同步刷盤會構建GroupCommitRequest組提交請求并設置本次刷盤后的位置偏移量的值(寫入位置偏移量+寫入數據字節數),然后將請求添加到GroupCommitService中進行刷盤。異步刷盤,表示消息寫入內存成功之后就返回,由MQ定時將數據刷入到磁盤中,會有一定的數據丟失風險。CommitLog#submitFlushRequest如下:
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) { // Synchronization flush 同步刷盤 if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { // 獲取GroupCommitService final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; // 是否等待 if (messageExt.isWaitStoreMsgOK()) { // 構建組提交請求,傳入本次刷盤后位置的偏移量:寫入位置偏移量+寫入數據字節數 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); // 刷盤請求 service.putRequest(request); return request.future(); } else { service.wakeup(); return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } } // Asynchronous flush 異步刷盤 這個就是靠os else { // 如果未使用暫存池 if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { // 喚醒刷盤線程進行刷盤 flushCommitLogService.wakeup(); } else { // 如果使用暫存池,使用commitLogService,先將數據寫入到FILECHANNEL,然后統一進行刷盤 commitLogService.wakeup(); } // 返回結果 return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); }}復制代碼2. 同步刷盤
如果使用的是同步刷盤,首先獲取了GroupCommitService,然后構建GroupCommitRequest組提交請求,將請求添加到GroupCommitService中,GroupCommitService用于提交刷盤數據。
2.1 GroupCommitRequest提交請求GroupCommitRequest是CommitLog的內部類:
nextOffset:寫入位置偏移量+寫入數據字節數,也就是本次刷盤成功后應該對應的flush偏移量flushOKFuture:刷盤結果timeoutMillis:刷盤的超時時間,超過超時時間還未刷盤完畢會被認為超時public static class GroupCommitRequest { // 刷盤點偏移量 private final long nextOffset; // 刷盤狀態 private CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>(); private final long startTimestamp = System.currentTimeMillis(); // 超時時間 private long timeoutMillis = Long.MAX_VALUE; public GroupCommitRequest(long nextOffset, long timeoutMillis) { this.nextOffset = nextOffset; this.timeoutMillis = timeoutMillis; } public void wakeupCustomer(final PutMessageStatus putMessageStatus) { // todo 在這里調用 結束刷盤,設置刷盤狀態 this.flushOKFuture.complete(putMessageStatus); }復制代碼2.2 GroupCommitService處理刷盤
GroupCommitService是CommitLog的內部類,從繼承關系中可知它實現了Runnable接口,在run方法調用waitForRunning等待刷盤請求的提交,然后處理刷盤,不過這個線程是在什么時候啟動的呢?
public class CommitLog { /** * GroupCommit Service */ class GroupCommitService extends FlushCommitLogService { // ... // run方法 public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { // 等待刷盤請求的到來 this.waitForRunning(10); // 處理刷盤 this.doCommit(); } catch (exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } // ... } }}復制代碼2.2.1 刷盤線程的啟動
在BrokerController的啟動方法中,可以看到調用了messageStore的start方法,前面可知使用的是DefaultMessageStore,進入到DefaultMessageStore的start方法,它又調用了commitLog的start方法,在CommitLog的start方法中,啟動了刷盤的線程和監控刷盤的線程:
public class BrokerController { public void start() throws Exception { if (this.messageStore != null) { // 啟動 this.messageStore.start(); } // ... }}public class DefaultMessageStore implements MessageStore { /** * @throws Exception */ public void start() throws Exception { // ... this.flushConsumeQueueService.start(); // 調用CommitLog的啟動方法 this.commitLog.start(); this.storeStatsService.start(); // ... }}public class CommitLog { private final FlushCommitLogService flushCommitLogService; // 刷盤 private final FlushCommitLogService commitLogService; // commitLogService public void start() { // 啟動刷盤的線程 this.flushCommitLogService.start(); flushDiskWatcher.setDaemon(true); if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { this.commitLogService.start(); } }}復制代碼2.2.2 刷盤請求的處理
既然知道了線程在何時啟動的,接下來詳細看一下GroupCommitService是如何處理刷盤提交請求的。
前面知道在GroupCommitService的run方法中,調用了waitForRunning方法等待刷盤請求,waitForRunning在GroupCommitService父類ServiceThread中實現。ServiceThread是一個抽象類,實現了Runnable接口,里面使用了CountDownLatch進行線程間的通信,大小設為1。
waitForRunning方法在進入的時候先判斷hasNotified是否為true(已通知),并嘗試將其更新為false(未通知),由于hasNotified的初始化值為false,所以首次進入的時候條件不成立,不會進入到這個處理邏輯,會繼續執行后面的代碼。
接著調用 waitPoint的reset方法將其重置為1,并調用waitPoint的await方法進行等待:
// ServiceThreadpublic abstract class ServiceThread implements Runnable { // 是否通知,初始化為false protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false); // CountDownLatch用于線程間的通信 protected final CountDownLatch2 waitPoint = new CountDownLatch2(1); // 等待運行 protected void waitForRunning(long interval) { // 判斷hasNotified是否為true,并嘗試將其更新為false if (hasNotified.compareAndSet(true, false)) { // 調用onWaitEnd this.onWaitEnd(); return; } // 重置waitPoint的值,也就是值為1 waitPoint.reset(); try { // 會一直等待waitPoint值降為0 waitPoint.await(interval, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("Interrupted", e); } finally { // 是否被通知設置為false hasNotified.set(false); this.onWaitEnd(); } }}復制代碼1. 添加刷盤請求,喚醒刷盤線程
上面可知需要刷盤的時候調用了GroupCommitService的putRequest方法添加刷盤請求,在putRequest方法中,將刷盤請求GroupCommitRequest添加到了requestsWrite組提交寫請求鏈表中,然后調用wakeup方法喚醒刷盤線程,wakeup方法在它的父類ServiceThread中實現。
在wakeup方法中可以看到,首先將hasNotified更改為了true表示處于已通知狀態,然后調用了countDown方法,此時waitPoint值變成0,就會喚醒之前waitForRunning方法中一直在等待的線程。
public class CommitLog { /** * 組提交Service */ class GroupCommitService extends FlushCommitLogService { // 組提交寫請求鏈表 private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>(); // ... // 添加提交請求 public synchronized void putRequest(final GroupCommitRequest request) { // 加鎖 lock.lock(); try { // 加入到寫請求鏈表 this.requestsWrite.add(request); } finally { lock.unlock(); } // 喚醒線程執行提交任務 this.wakeup(); } // ... } }// ServiceThreadpublic abstract class ServiceThread implements Runnable { // CountDownLatch用于線程間的通信 protected final CountDownLatch2 waitPoint = new CountDownLatch2(1); // 喚醒刷盤線程 public void wakeup() { // 更改狀態為已通知狀態 if (hasNotified.compareAndSet(false, true)) { // waitPoint的值減1,由于大小設置為1,減1之后變為0,會喚醒等待的線程 waitPoint.countDown(); } } // ...}復制代碼2. 線程被喚醒,執行刷盤前的操作
waitForRunning方法中的await方法一直在等待countdown的值變為0,當上一步調用了wakeup后,就會喚醒該線程,然后開始往下執行,在finally中可以看到將是否被通知hasNotified又設置為了false。
然后調用了onWaitEnd方法,GroupCommitService方法中重寫了該方法,里面又調用了swapRequests方法將讀寫請求列表的數據進行了交換,putRequest方法中將提交的刷盤請求放在了寫鏈表中,經過交換,數據會被放在讀鏈表中,后續進行刷盤時會從讀鏈表中獲取請求進行處理:
// ServiceThreadpublic abstract class ServiceThread implements Runnable { // CountDownLatch protected final CountDownLatch2 waitPoint = new CountDownLatch2(1); // 等待運行 protected void waitForRunning(long interval) { if (hasNotified.compareAndSet(true, false)) { // 交換 this.onWaitEnd(); return; } // 重置 waitPoint.reset(); try { // 會一直等待countdown為0 waitPoint.await(interval, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("Interrupted", e); } finally { // 是否被通知設置為false hasNotified.set(false); this.onWaitEnd(); } }}public class CommitLog { /** * 組提交Service */ class GroupCommitService extends FlushCommitLogService { // 組提交寫請求鏈表 private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>(); // 組提交讀請求鏈表 private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>(); @Override protected void onWaitEnd() { // 交換讀寫請求列表的數據請求 this.swapRequests(); } private void swapRequests() { // 加鎖 lock.lock(); try { // 將讀寫請求鏈表的數據進行交換 LinkedList<GroupCommitRequest> tmp = this.requestsWrite; this.requestsWrite = this.requestsRead; this.requestsRead = tmp; } finally { lock.unlock(); } } // ... }}復制代碼
這里使用讀寫鏈表進行交換應該是為了提升性能,如果只使用一個鏈表,在提交請求的時候需要往鏈表中添加請求,此時需要加鎖,而刷盤線程在處理完請求之后是需要從鏈表中移除請求的,假設添加請求時加的鎖還未釋放,刷盤線程就要一直等待,而添加和處理完全可以同時進行,所以使用了兩個鏈表,在添加請求的時候使用寫鏈表,處理請求的時候對讀寫鏈表的數據進行交換使用讀鏈表,這樣只需在交換數據的時候加鎖,以此來提升性能。
3. 執行刷盤waitForRunning執行完畢后,會回到GroupCommitService中的run方法開始繼續往后執行代碼,從代碼中可以看到接下來會調用doCommit方法執行刷盤。
doCommit方法中對讀鏈表中的數據進行了判空,如果不為空,進行遍歷處理每一個提交請求,處理邏輯如下:
獲取CommitLog映射文件記錄的刷盤位置偏移量flushedWhere,判斷是否大于請求設定的刷盤位置偏移量nextOffset,正常情況下flush的位置應該小于本次刷入數據后的偏移量,所以如果flush位置大于等于本次請求設置的flush偏移量,本次將不能進行刷盤開啟一個循環,調用mappedFileQueue的flush方法執行刷盤(具體的實現在異步刷盤的時候再看),由于CommitLog大小為1G,所以本次刷完之后,如果當前已經刷入的偏移量小于請求設定的位置,表示數據未刷完,需要繼續刷,反之表示數據已經刷完,flushOK為true,for循環條件不滿足結束執行。請求處理之后會清空讀鏈表。class GroupCommitService extends FlushCommitLogService { // 組提交寫請求鏈表 // 同步刷盤任務暫存容器 private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>(); // 讀寫分離,避免了任務提交與任務執行的鎖沖突 private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>(); // 提交刷盤 private void doCommit() { synchronized (this.requestsRead) { if (!this.requestsRead.isEmpty()) { // 遍歷刷盤請求 for (GroupCommitRequest req : this.requestsRead) { // There may be a message in the next file, so a maximum of // two times the flush // 獲取映射文件的flush位置,判斷是否大于請求設定的刷盤位置 boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); // 請求1次+重試1次 for (int i = 0; i < 2 && !flushOK; i++) { // todo 刷盤操作 CommitLog.this.mappedFileQueue.flush(0); // 由于CommitLog大小為1G,所以本次刷完之后,如果當前已經刷入的偏移量小于請求設定的位置, // 表示數據未刷完,需要繼續刷,反之表示數據已經刷完,flushOK為true,for循環條件不滿足結束執行 flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); } // todo 喚醒消息發送線程并通知刷盤結果 req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT); } long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } // 請求處理完之后清空鏈表 this.requestsRead.clear(); } else { // Because of individual messages is set to not sync flush, it // will come to this process CommitLog.this.mappedFileQueue.flush(0); } } } public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { // todo 等待10ms 并交換讀寫容器 this.waitForRunning(10); // todo this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } // Under normal circumstances shutdown, wait for the arrival of the // request, and then flush // 睡眠10毫秒 try { Thread.sleep(10); } catch (InterruptedException e) { CommitLog.log.warn("GroupCommitService Exception, ", e); } synchronized (this) { // 交換 this.swapRequests(); } // 停止之前提交一次 this.doCommit(); CommitLog.log.info(this.getServiceName() + " service end"); }復制代碼3. 異步刷盤
上面講解了同步刷盤,接下來去看下異步刷盤,首先會判斷是否使用了暫存池,如果未開啟調用flushCommitLogService的wakeup喚醒刷盤線程,否則使用commitLogService先將數據寫入到FileChannel,然后統一進行刷盤:
public class CommitLog { public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) { // 是否是同步刷盤 if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { // ... } // 如果是異步刷盤 else { // 如果未使用暫存池 if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { // 喚醒刷盤線程進行刷盤 flushCommitLogService.wakeup(); } else { // 如果使用暫存池,使用commitLogService,先將數據寫入到FILECHANNEL,然后統一進行刷盤 commitLogService.wakeup(); } // 返回結果 return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } }}復制代碼
在CommitLog的構造函數中可以看到,commitLogService使用的是CommitRealTimeService進行實例化的,flushCommitLogService需要根據設置決定使用哪種類型進行實例化:
如果是同步刷盤,使用GroupCommitService,由前面的同步刷盤可知,使用的就是GroupCommitService進行刷盤的。如果是異步刷盤,使用FlushRealTimeService。所以接下來需要關注CommitRealTimeService和FlushRealTimeService:
public class CommitLog { private final FlushCommitLogService flushCommitLogService; // 刷盤Service private final FlushCommitLogService commitLogService; public CommitLog(final DefaultMessageStore defaultMessageStore) { // 如果設置的同步刷盤 if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { // 使用GroupCommitService this.flushCommitLogService = new GroupCommitService(); } else { // 使用FlushRealTimeService this.flushCommitLogService = new FlushRealTimeService(); } // commitLogService this.commitLogService = new CommitRealTimeService(); }}復制代碼3.1 CommitRealTimeService
在開啟暫存池時,會使用CommitRealTimeService,它繼承了FlushCommitLogService,所以會實現run方法,處理邏輯如下:
從配置信息中獲取提交間隔、每次提交的最少頁數和兩次提交的最大間隔時間如果當前時間大于上次提交時間+兩次提交的最大間隔時間,意味著已經有比較長的一段時間沒有進行提交了,需要盡快刷盤,此時將每次提交的最少頁數設置為0不限制提交頁數調用mappedFileQueue的commit方法進行提交,并返回提交的結果:如果結果為true表示未提交任何數據如果結果為false表示進行了數據提交,需要等待刷盤判斷提交返回結果是否返回false,如果是調用flushCommitLogService的wakeup方法喚醒刷盤線程,進行刷盤調用waitForRunning等待下一次提交處理class CommitRealTimeService extends FlushCommitLogService { // 上次提交時間戳 private long lastCommitTimestamp = 0; @Override public String getServiceName() { return CommitRealTimeService.class.getSimpleName(); } @Override public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { // CommitRealTimeService線程間隔時間,默認200ms int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); // 一次提交任務至少包含的頁數,如果待提交數據不足,小于該參數配置的值,將忽略本次提交任務,默認4頁 int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); // 兩次真實提交的最大間隔時間,默認200ms int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); // 開始時間 long begin = System.currentTimeMillis(); // 如果距上次提交間隔超過commitDataThoroughInterval,則本次提交忽略commitLogLeastPages //參數,也就是如果待提交數據小于指定頁數,也執行提交操作 if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { // 提交時間 this.lastCommitTimestamp = begin; // 最少提交頁數設為0,表示不限制提交頁數 commitDataLeastPages = 0; } try { // todo 執行提交操作,將待提交數據提交到物理文件的內存映射內存區 boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); // 提交結束時間 long end = System.currentTimeMillis(); // 如果返回false,并不代表提交失敗,而是表示有數據提交成功了,喚醒刷盤線程執行刷盤操作 if (!result) { // 再次更新提交時間戳 this.lastCommitTimestamp = end; // result = false means some data committed. //now wake up flush thread. // 喚醒flush線程進行刷盤 flushCommitLogService.wakeup(); } if (end - begin > 500) { log.info("Commit data to file costs {} ms", end - begin); } // 該線程每完成一次提交動作,將等待200ms再繼續執行下一次提交任務 this.waitForRunning(interval); } catch (Throwable e) { CommitLog.log.error(this.getServiceName() + " service has exception. ", e); } } boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.commit(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } CommitLog.log.info(this.getServiceName() + " service end"); }}復制代碼提交
提交的方法在MappedFileQueue的commit方法中實現,處理邏輯如下:
根據記錄的CommitLog文件提交位置的偏移量獲取映射文件,如果獲取不為空,調用MappedFile的commit方法進行提交,然后返回本次提交數據的偏移量記錄本次提交的偏移量:文件的偏移量 + 提交數據的偏移量判斷本次提交的偏移量是否等于上一次的提交偏移量,如果等于表示本次未提交任何數據,返回結果置為true,否則表示提交了數據,等待刷盤,返回結果為false更新上一次提交偏移量committedWhere的值為本次的提交偏移量的值public class MappedFileQueue { protected long flushedWhere = 0; // flush的位置偏移量 private long committedWhere = 0; // 提交的位置偏移量 public boolean commit(final int commitLeastPages) { boolean result = true; // 根據提交位置的偏移量獲取映射文件 MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0); if (mappedFile != null) { // 調用mappedFile的commit方法進行提交,返回提交數據的偏移量 int offset = mappedFile.commit(commitLeastPages); // 記錄本次提交的偏移量:文件的偏移量 + 提交數據的偏移量 long where = mappedFile.getFileFromOffset() + offset; // 設置返回結果,如果本次提交偏移量等于上一次的提交偏移量為true,表示什么也沒干,否則表示提交了數據,等待刷盤 result = where == this.committedWhere; // 更新上一次提交偏移量的值為本次的 this.committedWhere = where; } return result; }}復制代碼3.2 MappedFile
MappedFile中記錄CommitLog的寫入位置wrotePosition、提交位置committedPosition以及flush位置flushedPosition,在commit方法中,調用了isAbleToCommit判斷是否可以提交數據,判斷的流程如下:
獲取提交數據的位置偏移量和寫入數據的位置偏移量如果最少提交頁數大于0,計算本次寫入的頁數是否大于或等于最少提交頁數本次寫入數據的頁數計算方法:寫入位置/頁大小 - flush位置/頁大小如果以上條件都滿足,判斷寫入位置是否大于flush位置,如果大于表示有一部數據未flush可以進行提交滿足提交條件后,就會調用commit0方法提交數據,將數據寫入到fileChannel中:
public class MappedFile extends ReferenceResource { // 數據寫入位置 protected final AtomicInteger wrotePosition = new AtomicInteger(0); // 數據提交位置 protected final AtomicInteger committedPosition = new AtomicInteger(0); // 數據flush位置 private final AtomicInteger flushedPosition = new AtomicInteger(0); // 提交數據 public int commit(final int commitLeastPages) { // 如果writeBuffer為空 if (writeBuffer == null) { // 不需要提交任何數據到,返回之前記錄的寫入位置 return this.wrotePosition.get(); } // 如果可以提交數據 if (this.isAbleToCommit(commitLeastPages)) { if (this.hold()) { // 提交數據 commit0(); this.release(); } else { log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get()); } } // All dirty data has been committed to FileChannel. if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) { this.transientStorePool.returnBuffer(writeBuffer); this.writeBuffer = null; } // 返回提交位置 return this.committedPosition.get(); } // 是否可以提交數據 protected boolean isAbleToCommit(final int commitLeastPages) { // 獲取提交數據的位置偏移量 int flush = this.committedPosition.get(); // 獲取寫入數據的位置偏移量 int write = this.wrotePosition.get(); if (this.isFull()) { return true; } // 如果最少提交頁數大于0 if (commitLeastPages > 0) { // 寫入位置/頁大小 - flush位置/頁大小 是否大于至少提交的頁數 return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages; } // 判斷是否需要flush數據 return write > flush; } protected void commit0() { // 獲取寫入位置 int writePos = this.wrotePosition.get(); // 獲取上次提交的位置 int lastCommittedPosition = this.committedPosition.get(); if (writePos - lastCommittedPosition > 0) { try { // 創建共享緩沖區 ByteBuffer byteBuffer = writeBuffer.slice(); // 設置上一次提交位置 byteBuffer.position(lastCommittedPosition); byteBuffer.limit(writePos); this.fileChannel.position(lastCommittedPosition); // 數據寫入fileChannel this.fileChannel.write(byteBuffer); // 更新寫入的位置 this.committedPosition.set(writePos); } catch (Throwable e) { log.error("Error occurred when commit data to FileChannel.", e); } } }}復制代碼3.3 FlushRealTimeService
如果未開啟暫存池,會直接使用FlushRealTimeService進行刷盤,當然如果開啟暫存池,寫入一批數據后,同樣會使用FlushRealTimeService進行刷盤,FlushRealTimeService同樣繼承了FlushCommitLogService,是用于執行刷盤的線程,處理邏輯與提交刷盤數據邏輯相似,只不過不是提交數據,而是調用flush方法將提交的數據刷入磁盤:
從配置信息中獲取flush間隔、每次flush的最少頁數和兩次flush的最大間隔時間如果當前時間大于上次flush時間+兩次flush的最大間隔時間,意味著已經有比較長的一段時間沒有進行flush,此時將每次flush的最少頁數設置為0不限制flush頁數調用waitForRunning等待被喚醒如果被喚醒,調用mappedFileQueue的flush方法進行刷盤class FlushRealTimeService extends FlushCommitLogService { // 上一次flush的時間 private long lastFlushTimestamp = 0; private long printTimes = 0; public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { // 默認為false,表示使用await方法等待;如果為true,表示使用Thread.sleep方法等待 boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed(); // 線程任務運行間隔時間 int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); // 一次刷盤任務至少包含頁數,如 //果待寫入數據不足,小于該參數配置的值,將忽略本次刷盤任務,默認4頁。 int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); // 兩次真實刷盤任務的最大間隔時間,默認10s int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); boolean printFlushProgress = false; // Print flush progress long currentTimeMillis = System.currentTimeMillis(); // 如果距上次提交數據的間隔時間超過 //flushPhysicQueueThoroughInterval,則本次刷盤任務將忽略 //flushPhysicQueueLeastPages,也就是如果待寫入數據小于指定頁 //數,也執行刷盤操作 if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) { // 更新flush時間 this.lastFlushTimestamp = currentTimeMillis; // flush至少包含的頁數置為0 flushPhysicQueueLeastPages = 0; printFlushProgress = (printTimes++ % 10) == 0; } try { // 執行一次刷盤任務前先等待指定時間間隔 if (flushCommitLogTimed) { Thread.sleep(interval); } else { // 等待flush被喚醒 this.waitForRunning(interval); } if (printFlushProgress) { // 打印刷盤進程 this.printFlushProgress(); } long begin = System.currentTimeMillis(); // todo flush方法 CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { // todo 更新checkpoint文件的CommitLog文件更新時間戳 CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } long past = System.currentTimeMillis() - begin; if (past > 500) { log.info("Flush data to disk costs {} ms", past); } } catch (Throwable e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); this.printFlushProgress(); } } // Normal shutdown, to ensure that all the flush before exit // 如果服務停止,確保數據被刷盤 boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { // 進行刷盤 result = CommitLog.this.mappedFileQueue.flush(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } this.printFlushProgress(); CommitLog.log.info(this.getServiceName() + " service end"); }復制代碼刷盤
刷盤的方法在MappedFileQueue的flush方法中實現,處理邏輯如下:
根據 flush的位置偏移量獲取映射文件調用mappedFile的flush方法進行刷盤,并返回刷盤后的位置偏移量計算最新的flush偏移量更新flushedWhere的值為最新的flush偏移量public class MappedFileQueue { protected long flushedWhere = 0; // flush的位置偏移量 private long committedWhere = 0; // 提交的位置偏移量 // flush刷盤 public boolean flush(final int flushLeastPages) { boolean result = true; // 獲取flush的位置偏移量映射文件 MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); if (mappedFile != null) { // 獲取時間戳 long tmpTimeStamp = mappedFile.getStoreTimestamp(); // 調用MappedFile的flush方法進行刷盤,返回刷盤后的偏移量 int offset = mappedFile.flush(flushLeastPages); // 計算最新的flush偏移量 long where = mappedFile.getFileFromOffset() + offset; result = where == this.flushedWhere; // 更新flush偏移量 this.flushedWhere = where; if (0 == flushLeastPages) { this.storeTimestamp = tmpTimeStamp; } } // 返回flush的偏移量 return result; }}復制代碼
flush的邏輯也與commit方法的邏輯類似:
調用isAbleToFlush判斷是否滿足刷盤條件,獲取上次flush位置偏移量和當前寫入位置偏移量進行如下校驗:文件是否已寫滿,即文件大小是否與寫入數據位置相等,如果相等說明文件已經寫滿需要執行刷盤,滿足刷盤條件如果最少flush頁數大于0,計算本次flush的頁數是否大于或等于最少flush頁數,如果滿足可以進行刷盤本次flush數據的頁數計算方法:寫入位置/頁大小 - flush位置/頁大小如果寫入位置偏移量是否大于flush位置偏移量,如果大于表示有數據未進行刷盤,滿足刷盤條件調用fileChannel的force或者mappedByteBuffer的force方法進行刷盤記錄本次flush的位置,并作為結果返回public class MappedFile extends ReferenceResource { protected final AtomicInteger wrotePosition = new AtomicInteger(0); protected final AtomicInteger committedPosition = new AtomicInteger(0); private final AtomicInteger flushedPosition = new AtomicInteger(0); /** * 進行刷盤并返回flush后的偏移量 */ public int flush(final int flushLeastPages) { // 是否可以刷盤 if (this.isAbleToFlush(flushLeastPages)) { if (this.hold()) { int value = getReadPosition(); try { // 如果writeBuffer不為空 if (writeBuffer != null || this.fileChannel.position() != 0) { // 將數據刷到硬盤 this.fileChannel.force(false); } else { this.mappedByteBuffer.force(); } } catch (Throwable e) { log.error("Error occurred when force data to disk.", e); } // 記錄flush位置 this.flushedPosition.set(value); this.release(); } else { log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get()); this.flushedPosition.set(getReadPosition()); } } // 返回flush位置 return this.getFlushedPosition(); } // 是否可以刷盤 private boolean isAbleToFlush(final int flushLeastPages) { // 獲取上次flush位置 int flush = this.flushedPosition.get(); // 寫入位置偏移量 int write = getReadPosition(); if (this.isFull()) { return true; } // 如果flush的頁數大于0,校驗本次flush的頁數是否滿足條件 if (flushLeastPages > 0) { // 本次flush的頁數:寫入位置偏移量/OS_PAGE_SIZE - 上次flush位置偏移量/OS_PAGE_SIZE,是否大于flushLeastPages return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages; } // 寫入位置偏移量是否大于flush位置偏移量 return write > flush; } // 文件是否已寫滿 public boolean isFull() { // 文件大小是否與寫入數據位置相等 return this.fileSize == this.wrotePosition.get(); } /** * 返回當前有效數據的位置 */ public int getReadPosition() { // 如果writeBuffer為空使用寫入位置,否則使用提交位置 return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get(); }}復制代碼4. 總結
同步刷盤總體流程:
異步刷盤總體流程:
以上就是關于pos機刷卡連接超時, broker 文件刷盤機制的知識,后面我們會繼續為大家整理關于pos機刷卡連接超時的知識,希望能夠幫助到大家!
