1 | /* |
2 | * Copyright 2004-2014 H2 Group. Multiple-Licensed under the MPL 2.0, |
3 | * and the EPL 1.0 (http://h2database.com/html/license.html). |
4 | * Initial Developer: H2 Group |
5 | */ |
6 | package org.h2.store; |
7 | |
8 | import java.io.IOException; |
9 | import java.util.ArrayList; |
10 | import java.util.Arrays; |
11 | import java.util.HashMap; |
12 | |
13 | import org.h2.api.ErrorCode; |
14 | import org.h2.compress.CompressLZF; |
15 | import org.h2.engine.Session; |
16 | import org.h2.engine.SysProperties; |
17 | import org.h2.message.DbException; |
18 | import org.h2.message.Trace; |
19 | import org.h2.result.Row; |
20 | import org.h2.util.BitField; |
21 | import org.h2.util.IntArray; |
22 | import org.h2.util.IntIntHashMap; |
23 | import org.h2.util.New; |
24 | import org.h2.value.Value; |
25 | import org.h2.value.ValueNull; |
26 | |
27 | /** |
28 | * Transaction log mechanism. The stream contains a list of records. The data |
29 | * format for a record is: |
30 | * <ul> |
31 | * <li>type (0: no-op, 1: undo, 2: commit, ...)</li> |
32 | * <li>data</li> |
33 | * </ul> |
34 | * The transaction log is split into sections. |
35 | * A checkpoint starts a new section. |
36 | */ |
37 | public class PageLog { |
38 | |
39 | /** |
40 | * No operation. |
41 | */ |
42 | public static final int NOOP = 0; |
43 | |
44 | /** |
45 | * An undo log entry. Format: page id: varInt, size, page. Size 0 means |
46 | * uncompressed, size 1 means empty page, otherwise the size is the number |
47 | * of compressed bytes. |
48 | */ |
49 | public static final int UNDO = 1; |
50 | |
51 | /** |
52 | * A commit entry of a session. |
53 | * Format: session id: varInt. |
54 | */ |
55 | public static final int COMMIT = 2; |
56 | |
57 | /** |
58 | * A prepare commit entry for a session. |
59 | * Format: session id: varInt, transaction name: string. |
60 | */ |
61 | public static final int PREPARE_COMMIT = 3; |
62 | |
63 | /** |
64 | * Roll back a prepared transaction. |
65 | * Format: session id: varInt. |
66 | */ |
67 | public static final int ROLLBACK = 4; |
68 | |
69 | /** |
70 | * Add a record to a table. |
71 | * Format: session id: varInt, table id: varInt, row. |
72 | */ |
73 | public static final int ADD = 5; |
74 | |
75 | /** |
76 | * Remove a record from a table. |
77 | * Format: session id: varInt, table id: varInt, row. |
78 | */ |
79 | public static final int REMOVE = 6; |
80 | |
81 | /** |
82 | * Truncate a table. |
83 | * Format: session id: varInt, table id: varInt. |
84 | */ |
85 | public static final int TRUNCATE = 7; |
86 | |
87 | /** |
88 | * Perform a checkpoint. The log section id is incremented. |
89 | * Format: - |
90 | */ |
91 | public static final int CHECKPOINT = 8; |
92 | |
93 | /** |
94 | * Free a log page. |
95 | * Format: count: varInt, page ids: varInt |
96 | */ |
97 | public static final int FREE_LOG = 9; |
98 | |
99 | /** |
100 | * The recovery stage to undo changes (re-apply the backup). |
101 | */ |
102 | static final int RECOVERY_STAGE_UNDO = 0; |
103 | |
104 | /** |
105 | * The recovery stage to allocate pages used by the transaction log. |
106 | */ |
107 | static final int RECOVERY_STAGE_ALLOCATE = 1; |
108 | |
109 | /** |
110 | * The recovery stage to redo operations. |
111 | */ |
112 | static final int RECOVERY_STAGE_REDO = 2; |
113 | |
114 | private static final boolean COMPRESS_UNDO = true; |
115 | |
116 | private final PageStore store; |
117 | private final Trace trace; |
118 | |
119 | private Data writeBuffer; |
120 | private PageOutputStream pageOut; |
121 | private int firstTrunkPage; |
122 | private int firstDataPage; |
123 | private final Data dataBuffer; |
124 | private int logKey; |
125 | private int logSectionId, logPos; |
126 | private int firstSectionId; |
127 | |
128 | private final CompressLZF compress; |
129 | private final byte[] compressBuffer; |
130 | |
131 | /** |
132 | * If the bit is set, the given page was written to the current log section. |
133 | * The undo entry of these pages doesn't need to be written again. |
134 | */ |
135 | private BitField undo = new BitField(); |
136 | |
137 | /** |
138 | * The undo entry of those pages was written in any log section. |
139 | * These pages may not be used in the transaction log. |
140 | */ |
141 | private final BitField undoAll = new BitField(); |
142 | |
143 | /** |
144 | * The map of section ids (key) and data page where the section starts |
145 | * (value). |
146 | */ |
147 | private final IntIntHashMap logSectionPageMap = new IntIntHashMap(); |
148 | |
149 | /** |
150 | * The session state map. |
151 | * Only used during recovery. |
152 | */ |
153 | private HashMap<Integer, SessionState> sessionStates = New.hashMap(); |
154 | |
155 | /** |
156 | * The map of pages used by the transaction log. |
157 | * Only used during recovery. |
158 | */ |
159 | private BitField usedLogPages; |
160 | |
161 | /** |
162 | * This flag is set while freeing up pages. |
163 | */ |
164 | private boolean freeing; |
165 | |
166 | PageLog(PageStore store) { |
167 | this.store = store; |
168 | dataBuffer = store.createData(); |
169 | trace = store.getTrace(); |
170 | compress = new CompressLZF(); |
171 | compressBuffer = new byte[store.getPageSize() * 2]; |
172 | } |
173 | |
174 | /** |
175 | * Open the log for writing. For an existing database, the recovery |
176 | * must be run first. |
177 | * |
178 | * @param newFirstTrunkPage the first trunk page |
179 | * @param atEnd whether only pages at the end of the file should be used |
180 | */ |
181 | void openForWriting(int newFirstTrunkPage, boolean atEnd) { |
182 | trace.debug("log openForWriting firstPage: " + newFirstTrunkPage); |
183 | this.firstTrunkPage = newFirstTrunkPage; |
184 | logKey++; |
185 | pageOut = new PageOutputStream(store, |
186 | newFirstTrunkPage, undoAll, logKey, atEnd); |
187 | pageOut.reserve(1); |
188 | // pageBuffer = new BufferedOutputStream(pageOut, 8 * 1024); |
189 | store.setLogFirstPage(logKey, newFirstTrunkPage, |
190 | pageOut.getCurrentDataPageId()); |
191 | writeBuffer = store.createData(); |
192 | } |
193 | |
194 | /** |
195 | * Free up all pages allocated by the log. |
196 | */ |
197 | void free() { |
198 | if (trace.isDebugEnabled()) { |
199 | trace.debug("log free"); |
200 | } |
201 | int currentDataPage = 0; |
202 | if (pageOut != null) { |
203 | currentDataPage = pageOut.getCurrentDataPageId(); |
204 | pageOut.freeReserved(); |
205 | } |
206 | try { |
207 | freeing = true; |
208 | int first = 0; |
209 | int loopDetect = 1024, loopCount = 0; |
210 | PageStreamTrunk.Iterator it = new PageStreamTrunk.Iterator( |
211 | store, firstTrunkPage); |
212 | while (firstTrunkPage != 0 && firstTrunkPage < store.getPageCount()) { |
213 | PageStreamTrunk t = it.next(); |
214 | if (t == null) { |
215 | if (it.canDelete()) { |
216 | store.free(firstTrunkPage, false); |
217 | } |
218 | break; |
219 | } |
220 | if (loopCount++ >= loopDetect) { |
221 | first = t.getPos(); |
222 | loopCount = 0; |
223 | loopDetect *= 2; |
224 | } else if (first != 0 && first == t.getPos()) { |
225 | throw DbException.throwInternalError( |
226 | "endless loop at " + t); |
227 | } |
228 | t.free(currentDataPage); |
229 | firstTrunkPage = t.getNextTrunk(); |
230 | } |
231 | } finally { |
232 | freeing = false; |
233 | } |
234 | } |
235 | |
236 | /** |
237 | * Open the log for reading. |
238 | * |
239 | * @param newLogKey the first expected log key |
240 | * @param newFirstTrunkPage the first trunk page |
241 | * @param newFirstDataPage the index of the first data page |
242 | */ |
243 | void openForReading(int newLogKey, int newFirstTrunkPage, |
244 | int newFirstDataPage) { |
245 | this.logKey = newLogKey; |
246 | this.firstTrunkPage = newFirstTrunkPage; |
247 | this.firstDataPage = newFirstDataPage; |
248 | } |
249 | |
250 | /** |
251 | * Run one recovery stage. There are three recovery stages: 0: only the undo |
252 | * steps are run (restoring the state before the last checkpoint). 1: the |
253 | * pages that are used by the transaction log are allocated. 2: the |
254 | * committed operations are re-applied. |
255 | * |
256 | * @param stage the recovery stage |
257 | * @return whether the transaction log was empty |
258 | */ |
259 | boolean recover(int stage) { |
260 | if (trace.isDebugEnabled()) { |
261 | trace.debug("log recover stage: " + stage); |
262 | } |
263 | if (stage == RECOVERY_STAGE_ALLOCATE) { |
264 | PageInputStream in = new PageInputStream(store, |
265 | logKey, firstTrunkPage, firstDataPage); |
266 | usedLogPages = in.allocateAllPages(); |
267 | in.close(); |
268 | return true; |
269 | } |
270 | PageInputStream pageIn = new PageInputStream(store, |
271 | logKey, firstTrunkPage, firstDataPage); |
272 | DataReader in = new DataReader(pageIn); |
273 | int logId = 0; |
274 | Data data = store.createData(); |
275 | boolean isEmpty = true; |
276 | try { |
277 | int pos = 0; |
278 | while (true) { |
279 | int x = in.readByte(); |
280 | if (x < 0) { |
281 | break; |
282 | } |
283 | pos++; |
284 | isEmpty = false; |
285 | if (x == UNDO) { |
286 | int pageId = in.readVarInt(); |
287 | int size = in.readVarInt(); |
288 | if (size == 0) { |
289 | in.readFully(data.getBytes(), store.getPageSize()); |
290 | } else if (size == 1) { |
291 | // empty |
292 | Arrays.fill(data.getBytes(), 0, store.getPageSize(), (byte) 0); |
293 | } else { |
294 | in.readFully(compressBuffer, size); |
295 | try { |
296 | compress.expand(compressBuffer, 0, size, |
297 | data.getBytes(), 0, store.getPageSize()); |
298 | } catch (ArrayIndexOutOfBoundsException e) { |
299 | DbException.convertToIOException(e); |
300 | } |
301 | } |
302 | if (stage == RECOVERY_STAGE_UNDO) { |
303 | if (!undo.get(pageId)) { |
304 | if (trace.isDebugEnabled()) { |
305 | trace.debug("log undo {0}", pageId); |
306 | } |
307 | store.writePage(pageId, data); |
308 | undo.set(pageId); |
309 | undoAll.set(pageId); |
310 | } else { |
311 | if (trace.isDebugEnabled()) { |
312 | trace.debug("log undo skip {0}", pageId); |
313 | } |
314 | } |
315 | } |
316 | } else if (x == ADD) { |
317 | int sessionId = in.readVarInt(); |
318 | int tableId = in.readVarInt(); |
319 | Row row = readRow(in, data); |
320 | if (stage == RECOVERY_STAGE_UNDO) { |
321 | store.allocateIfIndexRoot(pos, tableId, row); |
322 | } else if (stage == RECOVERY_STAGE_REDO) { |
323 | if (isSessionCommitted(sessionId, logId, pos)) { |
324 | if (trace.isDebugEnabled()) { |
325 | trace.debug("log redo + table: " + tableId + |
326 | " s: " + sessionId + " " + row); |
327 | } |
328 | store.redo(tableId, row, true); |
329 | } else { |
330 | if (trace.isDebugEnabled()) { |
331 | trace.debug("log ignore s: " + sessionId + |
332 | " + table: " + tableId + " " + row); |
333 | } |
334 | } |
335 | } |
336 | } else if (x == REMOVE) { |
337 | int sessionId = in.readVarInt(); |
338 | int tableId = in.readVarInt(); |
339 | long key = in.readVarLong(); |
340 | if (stage == RECOVERY_STAGE_REDO) { |
341 | if (isSessionCommitted(sessionId, logId, pos)) { |
342 | if (trace.isDebugEnabled()) { |
343 | trace.debug("log redo - table: " + tableId + |
344 | " s:" + sessionId + " key: " + key); |
345 | } |
346 | store.redoDelete(tableId, key); |
347 | } else { |
348 | if (trace.isDebugEnabled()) { |
349 | trace.debug("log ignore s: " + sessionId + |
350 | " - table: " + tableId + " " + key); |
351 | } |
352 | } |
353 | } |
354 | } else if (x == TRUNCATE) { |
355 | int sessionId = in.readVarInt(); |
356 | int tableId = in.readVarInt(); |
357 | if (stage == RECOVERY_STAGE_REDO) { |
358 | if (isSessionCommitted(sessionId, logId, pos)) { |
359 | if (trace.isDebugEnabled()) { |
360 | trace.debug("log redo truncate table: " + tableId); |
361 | } |
362 | store.redoTruncate(tableId); |
363 | } else { |
364 | if (trace.isDebugEnabled()) { |
365 | trace.debug("log ignore s: "+ sessionId + |
366 | " truncate table: " + tableId); |
367 | } |
368 | } |
369 | } |
370 | } else if (x == PREPARE_COMMIT) { |
371 | int sessionId = in.readVarInt(); |
372 | String transaction = in.readString(); |
373 | if (trace.isDebugEnabled()) { |
374 | trace.debug("log prepare commit " + sessionId + " " + |
375 | transaction + " pos: " + pos); |
376 | } |
377 | if (stage == RECOVERY_STAGE_UNDO) { |
378 | int page = pageIn.getDataPage(); |
379 | setPrepareCommit(sessionId, page, transaction); |
380 | } |
381 | } else if (x == ROLLBACK) { |
382 | int sessionId = in.readVarInt(); |
383 | if (trace.isDebugEnabled()) { |
384 | trace.debug("log rollback " + sessionId + " pos: " + pos); |
385 | } |
386 | // ignore - this entry is just informational |
387 | } else if (x == COMMIT) { |
388 | int sessionId = in.readVarInt(); |
389 | if (trace.isDebugEnabled()) { |
390 | trace.debug("log commit " + sessionId + " pos: " + pos); |
391 | } |
392 | if (stage == RECOVERY_STAGE_UNDO) { |
393 | setLastCommitForSession(sessionId, logId, pos); |
394 | } |
395 | } else if (x == NOOP) { |
396 | // nothing to do |
397 | } else if (x == CHECKPOINT) { |
398 | logId++; |
399 | } else if (x == FREE_LOG) { |
400 | int count = in.readVarInt(); |
401 | for (int i = 0; i < count; i++) { |
402 | int pageId = in.readVarInt(); |
403 | if (stage == RECOVERY_STAGE_REDO) { |
404 | if (!usedLogPages.get(pageId)) { |
405 | store.free(pageId, false); |
406 | } |
407 | } |
408 | } |
409 | } else { |
410 | if (trace.isDebugEnabled()) { |
411 | trace.debug("log end"); |
412 | break; |
413 | } |
414 | } |
415 | } |
416 | } catch (DbException e) { |
417 | if (e.getErrorCode() == ErrorCode.FILE_CORRUPTED_1) { |
418 | trace.debug("log recovery stopped"); |
419 | } else { |
420 | throw e; |
421 | } |
422 | } catch (IOException e) { |
423 | trace.debug("log recovery completed"); |
424 | } |
425 | undo = new BitField(); |
426 | if (stage == RECOVERY_STAGE_REDO) { |
427 | usedLogPages = null; |
428 | } |
429 | return isEmpty; |
430 | } |
431 | |
432 | /** |
433 | * This method is called when a 'prepare commit' log entry is read when |
434 | * opening the database. |
435 | * |
436 | * @param sessionId the session id |
437 | * @param pageId the data page with the prepare entry |
438 | * @param transaction the transaction name, or null to rollback |
439 | */ |
440 | private void setPrepareCommit(int sessionId, int pageId, String transaction) { |
441 | SessionState state = getOrAddSessionState(sessionId); |
442 | PageStoreInDoubtTransaction doubt; |
443 | if (transaction == null) { |
444 | doubt = null; |
445 | } else { |
446 | doubt = new PageStoreInDoubtTransaction(store, sessionId, pageId, |
447 | transaction); |
448 | } |
449 | state.inDoubtTransaction = doubt; |
450 | } |
451 | |
452 | /** |
453 | * Read a row from an input stream. |
454 | * |
455 | * @param in the input stream |
456 | * @param data a temporary buffer |
457 | * @return the row |
458 | */ |
459 | public static Row readRow(DataReader in, Data data) throws IOException { |
460 | long key = in.readVarLong(); |
461 | int len = in.readVarInt(); |
462 | data.reset(); |
463 | data.checkCapacity(len); |
464 | in.readFully(data.getBytes(), len); |
465 | int columnCount = data.readVarInt(); |
466 | Value[] values = new Value[columnCount]; |
467 | for (int i = 0; i < columnCount; i++) { |
468 | values[i] = data.readValue(); |
469 | } |
470 | Row row = new Row(values, Row.MEMORY_CALCULATE); |
471 | row.setKey(key); |
472 | return row; |
473 | } |
474 | |
475 | /** |
476 | * Check if the undo entry was already written for the given page. |
477 | * |
478 | * @param pageId the page |
479 | * @return true if it was written |
480 | */ |
481 | boolean getUndo(int pageId) { |
482 | return undo.get(pageId); |
483 | } |
484 | |
485 | /** |
486 | * Add an undo entry to the log. The page data is only written once until |
487 | * the next checkpoint. |
488 | * |
489 | * @param pageId the page id |
490 | * @param page the old page data |
491 | */ |
492 | void addUndo(int pageId, Data page) { |
493 | if (undo.get(pageId) || freeing) { |
494 | return; |
495 | } |
496 | if (trace.isDebugEnabled()) { |
497 | trace.debug("log undo " + pageId); |
498 | } |
499 | if (SysProperties.CHECK) { |
500 | if (page == null) { |
501 | DbException.throwInternalError("Undo entry not written"); |
502 | } |
503 | } |
504 | undo.set(pageId); |
505 | undoAll.set(pageId); |
506 | Data buffer = getBuffer(); |
507 | buffer.writeByte((byte) UNDO); |
508 | buffer.writeVarInt(pageId); |
509 | if (page.getBytes()[0] == 0) { |
510 | buffer.writeVarInt(1); |
511 | } else { |
512 | int pageSize = store.getPageSize(); |
513 | if (COMPRESS_UNDO) { |
514 | int size = compress.compress(page.getBytes(), |
515 | pageSize, compressBuffer, 0); |
516 | if (size < pageSize) { |
517 | buffer.writeVarInt(size); |
518 | buffer.checkCapacity(size); |
519 | buffer.write(compressBuffer, 0, size); |
520 | } else { |
521 | buffer.writeVarInt(0); |
522 | buffer.checkCapacity(pageSize); |
523 | buffer.write(page.getBytes(), 0, pageSize); |
524 | } |
525 | } else { |
526 | buffer.writeVarInt(0); |
527 | buffer.checkCapacity(pageSize); |
528 | buffer.write(page.getBytes(), 0, pageSize); |
529 | } |
530 | } |
531 | write(buffer); |
532 | } |
533 | |
534 | private void freeLogPages(IntArray pages) { |
535 | if (trace.isDebugEnabled()) { |
536 | trace.debug("log frees " + pages.get(0) + ".." + |
537 | pages.get(pages.size() - 1)); |
538 | } |
539 | Data buffer = getBuffer(); |
540 | buffer.writeByte((byte) FREE_LOG); |
541 | int size = pages.size(); |
542 | buffer.writeVarInt(size); |
543 | for (int i = 0; i < size; i++) { |
544 | buffer.writeVarInt(pages.get(i)); |
545 | } |
546 | write(buffer); |
547 | } |
548 | |
549 | private void write(Data data) { |
550 | pageOut.write(data.getBytes(), 0, data.length()); |
551 | data.reset(); |
552 | } |
553 | |
554 | /** |
555 | * Mark a transaction as committed. |
556 | * |
557 | * @param sessionId the session |
558 | */ |
559 | void commit(int sessionId) { |
560 | if (trace.isDebugEnabled()) { |
561 | trace.debug("log commit s: " + sessionId); |
562 | } |
563 | if (store.getDatabase().getPageStore() == null) { |
564 | // database already closed |
565 | return; |
566 | } |
567 | Data buffer = getBuffer(); |
568 | buffer.writeByte((byte) COMMIT); |
569 | buffer.writeVarInt(sessionId); |
570 | write(buffer); |
571 | if (store.getDatabase().getFlushOnEachCommit()) { |
572 | flush(); |
573 | } |
574 | } |
575 | |
576 | /** |
577 | * Prepare a transaction. |
578 | * |
579 | * @param session the session |
580 | * @param transaction the name of the transaction |
581 | */ |
582 | void prepareCommit(Session session, String transaction) { |
583 | if (trace.isDebugEnabled()) { |
584 | trace.debug("log prepare commit s: " + session.getId() + ", " + transaction); |
585 | } |
586 | if (store.getDatabase().getPageStore() == null) { |
587 | // database already closed |
588 | return; |
589 | } |
590 | // store it on a separate log page |
591 | int pageSize = store.getPageSize(); |
592 | pageOut.flush(); |
593 | pageOut.fillPage(); |
594 | Data buffer = getBuffer(); |
595 | buffer.writeByte((byte) PREPARE_COMMIT); |
596 | buffer.writeVarInt(session.getId()); |
597 | buffer.writeString(transaction); |
598 | if (buffer.length() >= PageStreamData.getCapacity(pageSize)) { |
599 | throw DbException.getInvalidValueException( |
600 | "transaction name (too long)", transaction); |
601 | } |
602 | write(buffer); |
603 | // store it on a separate log page |
604 | flushOut(); |
605 | pageOut.fillPage(); |
606 | if (store.getDatabase().getFlushOnEachCommit()) { |
607 | flush(); |
608 | } |
609 | } |
610 | |
611 | /** |
612 | * A record is added to a table, or removed from a table. |
613 | * |
614 | * @param session the session |
615 | * @param tableId the table id |
616 | * @param row the row to add |
617 | * @param add true if the row is added, false if it is removed |
618 | */ |
619 | void logAddOrRemoveRow(Session session, int tableId, Row row, boolean add) { |
620 | if (trace.isDebugEnabled()) { |
621 | trace.debug("log " + (add ? "+" : "-") + |
622 | " s: " + session.getId() + " table: " + tableId + " row: " + row); |
623 | } |
624 | session.addLogPos(logSectionId, logPos); |
625 | logPos++; |
626 | Data data = dataBuffer; |
627 | data.reset(); |
628 | int columns = row.getColumnCount(); |
629 | data.writeVarInt(columns); |
630 | data.checkCapacity(row.getByteCount(data)); |
631 | if (session.isRedoLogBinaryEnabled()) { |
632 | for (int i = 0; i < columns; i++) { |
633 | data.writeValue(row.getValue(i)); |
634 | } |
635 | } else { |
636 | for (int i = 0; i < columns; i++) { |
637 | Value v = row.getValue(i); |
638 | if (v.getType() == Value.BYTES) { |
639 | data.writeValue(ValueNull.INSTANCE); |
640 | } else { |
641 | data.writeValue(v); |
642 | } |
643 | } |
644 | } |
645 | Data buffer = getBuffer(); |
646 | buffer.writeByte((byte) (add ? ADD : REMOVE)); |
647 | buffer.writeVarInt(session.getId()); |
648 | buffer.writeVarInt(tableId); |
649 | buffer.writeVarLong(row.getKey()); |
650 | if (add) { |
651 | buffer.writeVarInt(data.length()); |
652 | buffer.checkCapacity(data.length()); |
653 | buffer.write(data.getBytes(), 0, data.length()); |
654 | } |
655 | write(buffer); |
656 | } |
657 | |
658 | /** |
659 | * A table is truncated. |
660 | * |
661 | * @param session the session |
662 | * @param tableId the table id |
663 | */ |
664 | void logTruncate(Session session, int tableId) { |
665 | if (trace.isDebugEnabled()) { |
666 | trace.debug("log truncate s: " + session.getId() + " table: " + tableId); |
667 | } |
668 | session.addLogPos(logSectionId, logPos); |
669 | logPos++; |
670 | Data buffer = getBuffer(); |
671 | buffer.writeByte((byte) TRUNCATE); |
672 | buffer.writeVarInt(session.getId()); |
673 | buffer.writeVarInt(tableId); |
674 | write(buffer); |
675 | } |
676 | |
677 | /** |
678 | * Flush the transaction log. |
679 | */ |
680 | void flush() { |
681 | if (pageOut != null) { |
682 | flushOut(); |
683 | } |
684 | } |
685 | |
686 | /** |
687 | * Switch to a new log section. |
688 | */ |
689 | void checkpoint() { |
690 | Data buffer = getBuffer(); |
691 | buffer.writeByte((byte) CHECKPOINT); |
692 | write(buffer); |
693 | undo = new BitField(); |
694 | logSectionId++; |
695 | logPos = 0; |
696 | pageOut.flush(); |
697 | pageOut.fillPage(); |
698 | int currentDataPage = pageOut.getCurrentDataPageId(); |
699 | logSectionPageMap.put(logSectionId, currentDataPage); |
700 | } |
701 | |
702 | int getLogSectionId() { |
703 | return logSectionId; |
704 | } |
705 | |
706 | int getLogFirstSectionId() { |
707 | return firstSectionId; |
708 | } |
709 | |
710 | int getLogPos() { |
711 | return logPos; |
712 | } |
713 | |
714 | /** |
715 | * Remove all pages until the given log (excluding). |
716 | * |
717 | * @param firstUncommittedSection the first log section to keep |
718 | */ |
719 | void removeUntil(int firstUncommittedSection) { |
720 | if (firstUncommittedSection == 0) { |
721 | return; |
722 | } |
723 | int firstDataPageToKeep = logSectionPageMap.get(firstUncommittedSection); |
724 | firstTrunkPage = removeUntil(firstTrunkPage, firstDataPageToKeep); |
725 | store.setLogFirstPage(logKey, firstTrunkPage, firstDataPageToKeep); |
726 | while (firstSectionId < firstUncommittedSection) { |
727 | if (firstSectionId > 0) { |
728 | // there is no entry for log 0 |
729 | logSectionPageMap.remove(firstSectionId); |
730 | } |
731 | firstSectionId++; |
732 | } |
733 | } |
734 | |
735 | /** |
736 | * Remove all pages until the given data page. |
737 | * |
738 | * @param trunkPage the first trunk page |
739 | * @param firstDataPageToKeep the first data page to keep |
740 | * @return the trunk page of the data page to keep |
741 | */ |
742 | private int removeUntil(int trunkPage, int firstDataPageToKeep) { |
743 | trace.debug("log.removeUntil " + trunkPage + " " + firstDataPageToKeep); |
744 | int last = trunkPage; |
745 | while (true) { |
746 | Page p = store.getPage(trunkPage); |
747 | PageStreamTrunk t = (PageStreamTrunk) p; |
748 | if (t == null) { |
749 | throw DbException.throwInternalError( |
750 | "log.removeUntil not found: " + firstDataPageToKeep + " last " + last); |
751 | } |
752 | logKey = t.getLogKey(); |
753 | last = t.getPos(); |
754 | if (t.contains(firstDataPageToKeep)) { |
755 | return last; |
756 | } |
757 | trunkPage = t.getNextTrunk(); |
758 | IntArray list = new IntArray(); |
759 | list.add(t.getPos()); |
760 | for (int i = 0;; i++) { |
761 | int next = t.getPageData(i); |
762 | if (next == -1) { |
763 | break; |
764 | } |
765 | list.add(next); |
766 | } |
767 | freeLogPages(list); |
768 | pageOut.free(t); |
769 | } |
770 | } |
771 | |
772 | /** |
773 | * Close without further writing. |
774 | */ |
775 | void close() { |
776 | trace.debug("log close"); |
777 | if (pageOut != null) { |
778 | pageOut.close(); |
779 | pageOut = null; |
780 | } |
781 | writeBuffer = null; |
782 | } |
783 | |
784 | /** |
785 | * Check if the session committed after than the given position. |
786 | * |
787 | * @param sessionId the session id |
788 | * @param logId the log id |
789 | * @param pos the position in the log |
790 | * @return true if it is committed |
791 | */ |
792 | private boolean isSessionCommitted(int sessionId, int logId, int pos) { |
793 | SessionState state = sessionStates.get(sessionId); |
794 | if (state == null) { |
795 | return false; |
796 | } |
797 | return state.isCommitted(logId, pos); |
798 | } |
799 | |
800 | /** |
801 | * Set the last commit record for a session. |
802 | * |
803 | * @param sessionId the session id |
804 | * @param logId the log id |
805 | * @param pos the position in the log |
806 | */ |
807 | private void setLastCommitForSession(int sessionId, int logId, int pos) { |
808 | SessionState state = getOrAddSessionState(sessionId); |
809 | state.lastCommitLog = logId; |
810 | state.lastCommitPos = pos; |
811 | state.inDoubtTransaction = null; |
812 | } |
813 | |
814 | /** |
815 | * Get the session state for this session. A new object is created if there |
816 | * is no session state yet. |
817 | * |
818 | * @param sessionId the session id |
819 | * @return the session state object |
820 | */ |
821 | private SessionState getOrAddSessionState(int sessionId) { |
822 | Integer key = sessionId; |
823 | SessionState state = sessionStates.get(key); |
824 | if (state == null) { |
825 | state = new SessionState(); |
826 | sessionStates.put(key, state); |
827 | state.sessionId = sessionId; |
828 | } |
829 | return state; |
830 | } |
831 | |
832 | long getSize() { |
833 | return pageOut == null ? 0 : pageOut.getSize(); |
834 | } |
835 | |
836 | ArrayList<InDoubtTransaction> getInDoubtTransactions() { |
837 | ArrayList<InDoubtTransaction> list = New.arrayList(); |
838 | for (SessionState state : sessionStates.values()) { |
839 | PageStoreInDoubtTransaction in = state.inDoubtTransaction; |
840 | if (in != null) { |
841 | list.add(in); |
842 | } |
843 | } |
844 | return list; |
845 | } |
846 | |
847 | /** |
848 | * Set the state of an in-doubt transaction. |
849 | * |
850 | * @param sessionId the session |
851 | * @param pageId the page where the commit was prepared |
852 | * @param commit whether the transaction should be committed |
853 | */ |
854 | void setInDoubtTransactionState(int sessionId, int pageId, boolean commit) { |
855 | PageStreamData d = (PageStreamData) store.getPage(pageId); |
856 | d.initWrite(); |
857 | Data buff = store.createData(); |
858 | buff.writeByte((byte) (commit ? COMMIT : ROLLBACK)); |
859 | buff.writeVarInt(sessionId); |
860 | byte[] bytes = buff.getBytes(); |
861 | d.write(bytes, 0, bytes.length); |
862 | bytes = new byte[d.getRemaining()]; |
863 | d.write(bytes, 0, bytes.length); |
864 | d.write(); |
865 | } |
866 | |
867 | /** |
868 | * Called after the recovery has been completed. |
869 | */ |
870 | void recoverEnd() { |
871 | sessionStates = New.hashMap(); |
872 | } |
873 | |
874 | private void flushOut() { |
875 | pageOut.flush(); |
876 | } |
877 | |
878 | private Data getBuffer() { |
879 | if (writeBuffer.length() == 0) { |
880 | return writeBuffer; |
881 | } |
882 | return store.createData(); |
883 | } |
884 | |
885 | |
886 | /** |
887 | * Get the smallest possible page id used. This is the trunk page if only |
888 | * appending at the end of the file, or 0. |
889 | * |
890 | * @return the smallest possible page. |
891 | */ |
892 | int getMinPageId() { |
893 | return pageOut == null ? 0 : pageOut.getMinPageId(); |
894 | } |
895 | |
896 | } |