EMMA Coverage Report (generated Sun Mar 01 22:06:14 CET 2015)
[all classes][org.h2.store]

COVERAGE SUMMARY FOR SOURCE FILE [PageLog.java]

nameclass, %method, %block, %line, %
PageLog.java100% (1/1)64%  (21/33)38%  (649/1710)42%  (165/389)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class PageLog100% (1/1)64%  (21/33)38%  (649/1710)42%  (165/389)
freeLogPages (IntArray): void 0%   (0/1)0%   (0/54)0%   (0/10)
getLogFirstSectionId (): int 0%   (0/1)0%   (0/3)0%   (0/1)
getLogPos (): int 0%   (0/1)0%   (0/3)0%   (0/1)
getMinPageId (): int 0%   (0/1)0%   (0/9)0%   (0/1)
getOrAddSessionState (int): SessionState 0%   (0/1)0%   (0/26)0%   (0/7)
isSessionCommitted (int, int, int): boolean 0%   (0/1)0%   (0/16)0%   (0/4)
logTruncate (Session, int): void 0%   (0/1)0%   (0/49)0%   (0/10)
prepareCommit (Session, String): void 0%   (0/1)0%   (0/74)0%   (0/19)
readRow (DataReader, Data): Row 0%   (0/1)0%   (0/45)0%   (0/12)
setInDoubtTransactionState (int, int, boolean): void 0%   (0/1)0%   (0/47)0%   (0/11)
setLastCommitForSession (int, int, int): void 0%   (0/1)0%   (0/14)0%   (0/5)
setPrepareCommit (int, int, String): void 0%   (0/1)0%   (0/22)0%   (0/6)
recover (int): boolean 100% (1/1)12%  (72/579)15%  (18/117)
removeUntil (int, int): int 100% (1/1)45%  (40/88)43%  (9/21)
getInDoubtTransactions (): ArrayList 100% (1/1)46%  (12/26)36%  (2.5/7)
getBuffer (): Data 100% (1/1)64%  (7/11)67%  (2/3)
logAddOrRemoveRow (Session, int, Row, boolean): void 100% (1/1)65%  (91/141)79%  (22/28)
commit (int): void 100% (1/1)66%  (27/41)73%  (8/11)
free (): void 100% (1/1)69%  (77/111)77%  (20.7/27)
addUndo (int, Data): void 100% (1/1)75%  (77/103)80%  (20/25)
getSize (): long 100% (1/1)78%  (7/9)77%  (0.8/1)
PageLog (PageStore): void 100% (1/1)100% (44/44)100% (11/11)
checkpoint (): void 100% (1/1)100% (40/40)100% (11/11)
close (): void 100% (1/1)100% (17/17)100% (6/6)
flush (): void 100% (1/1)100% (6/6)100% (3/3)
flushOut (): void 100% (1/1)100% (4/4)100% (2/2)
getLogSectionId (): int 100% (1/1)100% (3/3)100% (1/1)
getUndo (int): boolean 100% (1/1)100% (5/5)100% (1/1)
openForReading (int, int, int): void 100% (1/1)100% (10/10)100% (4/4)
openForWriting (int, boolean): void 100% (1/1)100% (52/52)100% (8/8)
recoverEnd (): void 100% (1/1)100% (4/4)100% (2/2)
removeUntil (int): void 100% (1/1)100% (43/43)100% (10/10)
write (Data): void 100% (1/1)100% (11/11)100% (3/3)

1/*
2 * Copyright 2004-2014 H2 Group. Multiple-Licensed under the MPL 2.0,
3 * and the EPL 1.0 (http://h2database.com/html/license.html).
4 * Initial Developer: H2 Group
5 */
6package org.h2.store;
7 
8import java.io.IOException;
9import java.util.ArrayList;
10import java.util.Arrays;
11import java.util.HashMap;
12 
13import org.h2.api.ErrorCode;
14import org.h2.compress.CompressLZF;
15import org.h2.engine.Session;
16import org.h2.engine.SysProperties;
17import org.h2.message.DbException;
18import org.h2.message.Trace;
19import org.h2.result.Row;
20import org.h2.util.BitField;
21import org.h2.util.IntArray;
22import org.h2.util.IntIntHashMap;
23import org.h2.util.New;
24import org.h2.value.Value;
25import org.h2.value.ValueNull;
26 
27/**
28 * Transaction log mechanism. The stream contains a list of records. The data
29 * format for a record is:
30 * <ul>
31 * <li>type (0: no-op, 1: undo, 2: commit, ...)</li>
32 * <li>data</li>
33 * </ul>
34 * The transaction log is split into sections.
35 * A checkpoint starts a new section.
36 */
37public class PageLog {
38 
39    /**
40     * No operation.
41     */
42    public static final int NOOP = 0;
43 
44    /**
45     * An undo log entry. Format: page id: varInt, size, page. Size 0 means
46     * uncompressed, size 1 means empty page, otherwise the size is the number
47     * of compressed bytes.
48     */
49    public static final int UNDO = 1;
50 
51    /**
52     * A commit entry of a session.
53     * Format: session id: varInt.
54     */
55    public static final int COMMIT = 2;
56 
57    /**
58     * A prepare commit entry for a session.
59     * Format: session id: varInt, transaction name: string.
60     */
61    public static final int PREPARE_COMMIT = 3;
62 
63    /**
64     * Roll back a prepared transaction.
65     * Format: session id: varInt.
66     */
67    public static final int ROLLBACK = 4;
68 
69    /**
70     * Add a record to a table.
71     * Format: session id: varInt, table id: varInt, row.
72     */
73    public static final int ADD = 5;
74 
75    /**
76     * Remove a record from a table.
77     * Format: session id: varInt, table id: varInt, row.
78     */
79    public static final int REMOVE = 6;
80 
81    /**
82     * Truncate a table.
83     * Format: session id: varInt, table id: varInt.
84     */
85    public static final int TRUNCATE = 7;
86 
87    /**
88     * Perform a checkpoint. The log section id is incremented.
89     * Format: -
90     */
91    public static final int CHECKPOINT = 8;
92 
93    /**
94     * Free a log page.
95     * Format: count: varInt, page ids: varInt
96     */
97    public static final int FREE_LOG = 9;
98 
99    /**
100     * The recovery stage to undo changes (re-apply the backup).
101     */
102    static final int RECOVERY_STAGE_UNDO = 0;
103 
104    /**
105     * The recovery stage to allocate pages used by the transaction log.
106     */
107    static final int RECOVERY_STAGE_ALLOCATE = 1;
108 
109    /**
110     * The recovery stage to redo operations.
111     */
112    static final int RECOVERY_STAGE_REDO = 2;
113 
114    private static final boolean COMPRESS_UNDO = true;
115 
116    private final PageStore store;
117    private final Trace trace;
118 
119    private Data writeBuffer;
120    private PageOutputStream pageOut;
121    private int firstTrunkPage;
122    private int firstDataPage;
123    private final Data dataBuffer;
124    private int logKey;
125    private int logSectionId, logPos;
126    private int firstSectionId;
127 
128    private final CompressLZF compress;
129    private final byte[] compressBuffer;
130 
131    /**
132     * If the bit is set, the given page was written to the current log section.
133     * The undo entry of these pages doesn't need to be written again.
134     */
135    private BitField undo = new BitField();
136 
137    /**
138     * The undo entry of those pages was written in any log section.
139     * These pages may not be used in the transaction log.
140     */
141    private final BitField undoAll = new BitField();
142 
143    /**
144     * The map of section ids (key) and data page where the section starts
145     * (value).
146     */
147    private final IntIntHashMap logSectionPageMap = new IntIntHashMap();
148 
149    /**
150     * The session state map.
151     * Only used during recovery.
152     */
153    private HashMap<Integer, SessionState> sessionStates = New.hashMap();
154 
155    /**
156     * The map of pages used by the transaction log.
157     * Only used during recovery.
158     */
159    private BitField usedLogPages;
160 
161    /**
162     * This flag is set while freeing up pages.
163     */
164    private boolean freeing;
165 
166    PageLog(PageStore store) {
167        this.store = store;
168        dataBuffer = store.createData();
169        trace = store.getTrace();
170        compress = new CompressLZF();
171        compressBuffer = new byte[store.getPageSize() * 2];
172    }
173 
174    /**
175     * Open the log for writing. For an existing database, the recovery
176     * must be run first.
177     *
178     * @param newFirstTrunkPage the first trunk page
179     * @param atEnd whether only pages at the end of the file should be used
180     */
181    void openForWriting(int newFirstTrunkPage, boolean atEnd) {
182        trace.debug("log openForWriting firstPage: " + newFirstTrunkPage);
183        this.firstTrunkPage = newFirstTrunkPage;
184        logKey++;
185        pageOut = new PageOutputStream(store,
186                newFirstTrunkPage, undoAll, logKey, atEnd);
187        pageOut.reserve(1);
188        // pageBuffer = new BufferedOutputStream(pageOut, 8 * 1024);
189        store.setLogFirstPage(logKey, newFirstTrunkPage,
190                pageOut.getCurrentDataPageId());
191        writeBuffer = store.createData();
192    }
193 
194    /**
195     * Free up all pages allocated by the log.
196     */
197    void free() {
198        if (trace.isDebugEnabled()) {
199            trace.debug("log free");
200        }
201        int currentDataPage = 0;
202        if (pageOut != null) {
203            currentDataPage = pageOut.getCurrentDataPageId();
204            pageOut.freeReserved();
205        }
206        try {
207            freeing = true;
208            int first = 0;
209            int loopDetect = 1024, loopCount = 0;
210            PageStreamTrunk.Iterator it = new PageStreamTrunk.Iterator(
211                    store, firstTrunkPage);
212            while (firstTrunkPage != 0 && firstTrunkPage < store.getPageCount()) {
213                PageStreamTrunk t = it.next();
214                if (t == null) {
215                    if (it.canDelete()) {
216                        store.free(firstTrunkPage, false);
217                    }
218                    break;
219                }
220                if (loopCount++ >= loopDetect) {
221                    first = t.getPos();
222                    loopCount = 0;
223                    loopDetect *= 2;
224                } else if (first != 0 && first == t.getPos()) {
225                    throw DbException.throwInternalError(
226                            "endless loop at " + t);
227                }
228                t.free(currentDataPage);
229                firstTrunkPage = t.getNextTrunk();
230            }
231        } finally {
232            freeing = false;
233        }
234    }
235 
236    /**
237     * Open the log for reading.
238     *
239     * @param newLogKey the first expected log key
240     * @param newFirstTrunkPage the first trunk page
241     * @param newFirstDataPage the index of the first data page
242     */
243    void openForReading(int newLogKey, int newFirstTrunkPage,
244            int newFirstDataPage) {
245        this.logKey = newLogKey;
246        this.firstTrunkPage = newFirstTrunkPage;
247        this.firstDataPage = newFirstDataPage;
248    }
249 
250    /**
251     * Run one recovery stage. There are three recovery stages: 0: only the undo
252     * steps are run (restoring the state before the last checkpoint). 1: the
253     * pages that are used by the transaction log are allocated. 2: the
254     * committed operations are re-applied.
255     *
256     * @param stage the recovery stage
257     * @return whether the transaction log was empty
258     */
259    boolean recover(int stage) {
260        if (trace.isDebugEnabled()) {
261            trace.debug("log recover stage: " + stage);
262        }
263        if (stage == RECOVERY_STAGE_ALLOCATE) {
264            PageInputStream in = new PageInputStream(store,
265                    logKey, firstTrunkPage, firstDataPage);
266            usedLogPages = in.allocateAllPages();
267            in.close();
268            return true;
269        }
270        PageInputStream pageIn = new PageInputStream(store,
271                logKey, firstTrunkPage, firstDataPage);
272        DataReader in = new DataReader(pageIn);
273        int logId = 0;
274        Data data = store.createData();
275        boolean isEmpty = true;
276        try {
277            int pos = 0;
278            while (true) {
279                int x = in.readByte();
280                if (x < 0) {
281                    break;
282                }
283                pos++;
284                isEmpty = false;
285                if (x == UNDO) {
286                    int pageId = in.readVarInt();
287                    int size = in.readVarInt();
288                    if (size == 0) {
289                        in.readFully(data.getBytes(), store.getPageSize());
290                    } else if (size == 1) {
291                        // empty
292                        Arrays.fill(data.getBytes(), 0, store.getPageSize(), (byte) 0);
293                    } else {
294                        in.readFully(compressBuffer, size);
295                        try {
296                            compress.expand(compressBuffer, 0, size,
297                                    data.getBytes(), 0, store.getPageSize());
298                        } catch (ArrayIndexOutOfBoundsException e) {
299                            DbException.convertToIOException(e);
300                        }
301                    }
302                    if (stage == RECOVERY_STAGE_UNDO) {
303                        if (!undo.get(pageId)) {
304                            if (trace.isDebugEnabled()) {
305                                trace.debug("log undo {0}", pageId);
306                            }
307                            store.writePage(pageId, data);
308                            undo.set(pageId);
309                            undoAll.set(pageId);
310                        } else {
311                            if (trace.isDebugEnabled()) {
312                                trace.debug("log undo skip {0}", pageId);
313                            }
314                        }
315                    }
316                } else if (x == ADD) {
317                    int sessionId = in.readVarInt();
318                    int tableId = in.readVarInt();
319                    Row row = readRow(in, data);
320                    if (stage == RECOVERY_STAGE_UNDO) {
321                        store.allocateIfIndexRoot(pos, tableId, row);
322                    } else if (stage == RECOVERY_STAGE_REDO) {
323                        if (isSessionCommitted(sessionId, logId, pos)) {
324                            if (trace.isDebugEnabled()) {
325                                trace.debug("log redo + table: " + tableId +
326                                        " s: " + sessionId + " " + row);
327                            }
328                            store.redo(tableId, row, true);
329                        } else {
330                            if (trace.isDebugEnabled()) {
331                                trace.debug("log ignore s: " + sessionId +
332                                        " + table: " + tableId + " " + row);
333                            }
334                        }
335                    }
336                } else if (x == REMOVE) {
337                    int sessionId = in.readVarInt();
338                    int tableId = in.readVarInt();
339                    long key = in.readVarLong();
340                    if (stage == RECOVERY_STAGE_REDO) {
341                        if (isSessionCommitted(sessionId, logId, pos)) {
342                            if (trace.isDebugEnabled()) {
343                                trace.debug("log redo - table: " + tableId +
344                                        " s:" + sessionId + " key: " + key);
345                            }
346                            store.redoDelete(tableId, key);
347                        } else {
348                            if (trace.isDebugEnabled()) {
349                                trace.debug("log ignore s: " + sessionId +
350                                        " - table: " + tableId + " " + key);
351                            }
352                        }
353                    }
354                } else if (x == TRUNCATE) {
355                    int sessionId = in.readVarInt();
356                    int tableId = in.readVarInt();
357                    if (stage == RECOVERY_STAGE_REDO) {
358                        if (isSessionCommitted(sessionId, logId, pos)) {
359                            if (trace.isDebugEnabled()) {
360                                trace.debug("log redo truncate table: " + tableId);
361                            }
362                            store.redoTruncate(tableId);
363                        } else {
364                            if (trace.isDebugEnabled()) {
365                                trace.debug("log ignore s: "+ sessionId +
366                                        " truncate table: " + tableId);
367                            }
368                        }
369                    }
370                } else if (x == PREPARE_COMMIT) {
371                    int sessionId = in.readVarInt();
372                    String transaction = in.readString();
373                    if (trace.isDebugEnabled()) {
374                        trace.debug("log prepare commit " + sessionId + " " +
375                                transaction + " pos: " + pos);
376                    }
377                    if (stage == RECOVERY_STAGE_UNDO) {
378                        int page = pageIn.getDataPage();
379                        setPrepareCommit(sessionId, page, transaction);
380                    }
381                } else if (x == ROLLBACK) {
382                    int sessionId = in.readVarInt();
383                    if (trace.isDebugEnabled()) {
384                        trace.debug("log rollback " + sessionId + " pos: " + pos);
385                    }
386                    // ignore - this entry is just informational
387                } else if (x == COMMIT) {
388                    int sessionId = in.readVarInt();
389                    if (trace.isDebugEnabled()) {
390                        trace.debug("log commit " + sessionId + " pos: " + pos);
391                    }
392                    if (stage == RECOVERY_STAGE_UNDO) {
393                        setLastCommitForSession(sessionId, logId, pos);
394                    }
395                } else  if (x == NOOP) {
396                    // nothing to do
397                } else if (x == CHECKPOINT) {
398                    logId++;
399                } else if (x == FREE_LOG) {
400                    int count = in.readVarInt();
401                    for (int i = 0; i < count; i++) {
402                        int pageId = in.readVarInt();
403                        if (stage == RECOVERY_STAGE_REDO) {
404                            if (!usedLogPages.get(pageId)) {
405                                store.free(pageId, false);
406                            }
407                        }
408                    }
409                } else {
410                    if (trace.isDebugEnabled()) {
411                        trace.debug("log end");
412                        break;
413                    }
414                }
415            }
416        } catch (DbException e) {
417            if (e.getErrorCode() == ErrorCode.FILE_CORRUPTED_1) {
418                trace.debug("log recovery stopped");
419            } else {
420                throw e;
421            }
422        } catch (IOException e) {
423            trace.debug("log recovery completed");
424        }
425        undo = new BitField();
426        if (stage == RECOVERY_STAGE_REDO) {
427            usedLogPages = null;
428        }
429        return isEmpty;
430    }
431 
432    /**
433     * This method is called when a 'prepare commit' log entry is read when
434     * opening the database.
435     *
436     * @param sessionId the session id
437     * @param pageId the data page with the prepare entry
438     * @param transaction the transaction name, or null to rollback
439     */
440    private void setPrepareCommit(int sessionId, int pageId, String transaction) {
441        SessionState state = getOrAddSessionState(sessionId);
442        PageStoreInDoubtTransaction doubt;
443        if (transaction == null) {
444            doubt = null;
445        } else {
446            doubt = new PageStoreInDoubtTransaction(store, sessionId, pageId,
447                    transaction);
448        }
449        state.inDoubtTransaction = doubt;
450    }
451 
452    /**
453     * Read a row from an input stream.
454     *
455     * @param in the input stream
456     * @param data a temporary buffer
457     * @return the row
458     */
459    public static Row readRow(DataReader in, Data data) throws IOException {
460        long key = in.readVarLong();
461        int len = in.readVarInt();
462        data.reset();
463        data.checkCapacity(len);
464        in.readFully(data.getBytes(), len);
465        int columnCount = data.readVarInt();
466        Value[] values = new Value[columnCount];
467        for (int i = 0; i < columnCount; i++) {
468            values[i] = data.readValue();
469        }
470        Row row = new Row(values, Row.MEMORY_CALCULATE);
471        row.setKey(key);
472        return row;
473    }
474 
475    /**
476     * Check if the undo entry was already written for the given page.
477     *
478     * @param pageId the page
479     * @return true if it was written
480     */
481    boolean getUndo(int pageId) {
482        return undo.get(pageId);
483    }
484 
485    /**
486     * Add an undo entry to the log. The page data is only written once until
487     * the next checkpoint.
488     *
489     * @param pageId the page id
490     * @param page the old page data
491     */
492    void addUndo(int pageId, Data page) {
493        if (undo.get(pageId) || freeing) {
494            return;
495        }
496        if (trace.isDebugEnabled()) {
497            trace.debug("log undo " + pageId);
498        }
499        if (SysProperties.CHECK) {
500            if (page == null) {
501                DbException.throwInternalError("Undo entry not written");
502            }
503        }
504        undo.set(pageId);
505        undoAll.set(pageId);
506        Data buffer = getBuffer();
507        buffer.writeByte((byte) UNDO);
508        buffer.writeVarInt(pageId);
509        if (page.getBytes()[0] == 0) {
510            buffer.writeVarInt(1);
511        } else {
512            int pageSize = store.getPageSize();
513            if (COMPRESS_UNDO) {
514                int size = compress.compress(page.getBytes(),
515                        pageSize, compressBuffer, 0);
516                if (size < pageSize) {
517                    buffer.writeVarInt(size);
518                    buffer.checkCapacity(size);
519                    buffer.write(compressBuffer, 0, size);
520                } else {
521                    buffer.writeVarInt(0);
522                    buffer.checkCapacity(pageSize);
523                    buffer.write(page.getBytes(), 0, pageSize);
524                }
525            } else {
526                buffer.writeVarInt(0);
527                buffer.checkCapacity(pageSize);
528                buffer.write(page.getBytes(), 0, pageSize);
529            }
530        }
531        write(buffer);
532    }
533 
534    private void freeLogPages(IntArray pages) {
535        if (trace.isDebugEnabled()) {
536            trace.debug("log frees " + pages.get(0) + ".." +
537                    pages.get(pages.size() - 1));
538        }
539        Data buffer = getBuffer();
540        buffer.writeByte((byte) FREE_LOG);
541        int size = pages.size();
542        buffer.writeVarInt(size);
543        for (int i = 0; i < size; i++) {
544            buffer.writeVarInt(pages.get(i));
545        }
546        write(buffer);
547    }
548 
549    private void write(Data data) {
550        pageOut.write(data.getBytes(), 0, data.length());
551        data.reset();
552    }
553 
554    /**
555     * Mark a transaction as committed.
556     *
557     * @param sessionId the session
558     */
559    void commit(int sessionId) {
560        if (trace.isDebugEnabled()) {
561            trace.debug("log commit s: " + sessionId);
562        }
563        if (store.getDatabase().getPageStore() == null) {
564            // database already closed
565            return;
566        }
567        Data buffer = getBuffer();
568        buffer.writeByte((byte) COMMIT);
569        buffer.writeVarInt(sessionId);
570        write(buffer);
571        if (store.getDatabase().getFlushOnEachCommit()) {
572            flush();
573        }
574    }
575 
576    /**
577     * Prepare a transaction.
578     *
579     * @param session the session
580     * @param transaction the name of the transaction
581     */
582    void prepareCommit(Session session, String transaction) {
583        if (trace.isDebugEnabled()) {
584            trace.debug("log prepare commit s: " + session.getId() + ", " + transaction);
585        }
586        if (store.getDatabase().getPageStore() == null) {
587            // database already closed
588            return;
589        }
590        // store it on a separate log page
591        int pageSize = store.getPageSize();
592        pageOut.flush();
593        pageOut.fillPage();
594        Data buffer = getBuffer();
595        buffer.writeByte((byte) PREPARE_COMMIT);
596        buffer.writeVarInt(session.getId());
597        buffer.writeString(transaction);
598        if (buffer.length()  >= PageStreamData.getCapacity(pageSize)) {
599            throw DbException.getInvalidValueException(
600                    "transaction name (too long)", transaction);
601        }
602        write(buffer);
603        // store it on a separate log page
604        flushOut();
605        pageOut.fillPage();
606        if (store.getDatabase().getFlushOnEachCommit()) {
607            flush();
608        }
609    }
610 
611    /**
612     * A record is added to a table, or removed from a table.
613     *
614     * @param session the session
615     * @param tableId the table id
616     * @param row the row to add
617     * @param add true if the row is added, false if it is removed
618     */
619    void logAddOrRemoveRow(Session session, int tableId, Row row, boolean add) {
620        if (trace.isDebugEnabled()) {
621            trace.debug("log " + (add ? "+" : "-") +
622                    " s: " + session.getId() + " table: " + tableId + " row: " + row);
623        }
624        session.addLogPos(logSectionId, logPos);
625        logPos++;
626        Data data = dataBuffer;
627        data.reset();
628        int columns = row.getColumnCount();
629        data.writeVarInt(columns);
630        data.checkCapacity(row.getByteCount(data));
631        if (session.isRedoLogBinaryEnabled()) {
632            for (int i = 0; i < columns; i++) {
633                data.writeValue(row.getValue(i));
634            }
635        } else {
636            for (int i = 0; i < columns; i++) {
637                Value v = row.getValue(i);
638                if (v.getType() == Value.BYTES) {
639                    data.writeValue(ValueNull.INSTANCE);
640                } else {
641                    data.writeValue(v);
642                }
643            }
644        }
645        Data buffer = getBuffer();
646        buffer.writeByte((byte) (add ? ADD : REMOVE));
647        buffer.writeVarInt(session.getId());
648        buffer.writeVarInt(tableId);
649        buffer.writeVarLong(row.getKey());
650        if (add) {
651            buffer.writeVarInt(data.length());
652            buffer.checkCapacity(data.length());
653            buffer.write(data.getBytes(), 0, data.length());
654        }
655        write(buffer);
656    }
657 
658    /**
659     * A table is truncated.
660     *
661     * @param session the session
662     * @param tableId the table id
663     */
664    void logTruncate(Session session, int tableId) {
665        if (trace.isDebugEnabled()) {
666            trace.debug("log truncate s: " + session.getId() + " table: " + tableId);
667        }
668        session.addLogPos(logSectionId, logPos);
669        logPos++;
670        Data buffer = getBuffer();
671        buffer.writeByte((byte) TRUNCATE);
672        buffer.writeVarInt(session.getId());
673        buffer.writeVarInt(tableId);
674        write(buffer);
675    }
676 
677    /**
678     * Flush the transaction log.
679     */
680    void flush() {
681        if (pageOut != null) {
682            flushOut();
683        }
684    }
685 
686    /**
687     * Switch to a new log section.
688     */
689    void checkpoint() {
690        Data buffer = getBuffer();
691        buffer.writeByte((byte) CHECKPOINT);
692        write(buffer);
693        undo = new BitField();
694        logSectionId++;
695        logPos = 0;
696        pageOut.flush();
697        pageOut.fillPage();
698        int currentDataPage = pageOut.getCurrentDataPageId();
699        logSectionPageMap.put(logSectionId, currentDataPage);
700    }
701 
702    int getLogSectionId() {
703        return logSectionId;
704    }
705 
706    int getLogFirstSectionId() {
707        return firstSectionId;
708    }
709 
710    int getLogPos() {
711        return logPos;
712    }
713 
714    /**
715     * Remove all pages until the given log (excluding).
716     *
717     * @param firstUncommittedSection the first log section to keep
718     */
719    void removeUntil(int firstUncommittedSection) {
720        if (firstUncommittedSection == 0) {
721            return;
722        }
723        int firstDataPageToKeep = logSectionPageMap.get(firstUncommittedSection);
724        firstTrunkPage = removeUntil(firstTrunkPage, firstDataPageToKeep);
725        store.setLogFirstPage(logKey, firstTrunkPage, firstDataPageToKeep);
726        while (firstSectionId < firstUncommittedSection) {
727            if (firstSectionId > 0) {
728                // there is no entry for log 0
729                logSectionPageMap.remove(firstSectionId);
730            }
731            firstSectionId++;
732        }
733    }
734 
735    /**
736     * Remove all pages until the given data page.
737     *
738     * @param trunkPage the first trunk page
739     * @param firstDataPageToKeep the first data page to keep
740     * @return the trunk page of the data page to keep
741     */
742    private int removeUntil(int trunkPage, int firstDataPageToKeep) {
743        trace.debug("log.removeUntil " + trunkPage + " " + firstDataPageToKeep);
744        int last = trunkPage;
745        while (true) {
746            Page p = store.getPage(trunkPage);
747            PageStreamTrunk t = (PageStreamTrunk) p;
748            if (t == null) {
749                throw DbException.throwInternalError(
750                        "log.removeUntil not found: " + firstDataPageToKeep + " last " + last);
751            }
752            logKey = t.getLogKey();
753            last = t.getPos();
754            if (t.contains(firstDataPageToKeep)) {
755                return last;
756            }
757            trunkPage = t.getNextTrunk();
758            IntArray list = new IntArray();
759            list.add(t.getPos());
760            for (int i = 0;; i++) {
761                int next = t.getPageData(i);
762                if (next == -1) {
763                    break;
764                }
765                list.add(next);
766            }
767            freeLogPages(list);
768            pageOut.free(t);
769        }
770    }
771 
772    /**
773     * Close without further writing.
774     */
775    void close() {
776        trace.debug("log close");
777        if (pageOut != null) {
778            pageOut.close();
779            pageOut = null;
780        }
781        writeBuffer = null;
782    }
783 
784    /**
785     * Check if the session committed after than the given position.
786     *
787     * @param sessionId the session id
788     * @param logId the log id
789     * @param pos the position in the log
790     * @return true if it is committed
791     */
792    private boolean isSessionCommitted(int sessionId, int logId, int pos) {
793        SessionState state = sessionStates.get(sessionId);
794        if (state == null) {
795            return false;
796        }
797        return state.isCommitted(logId, pos);
798    }
799 
800    /**
801     * Set the last commit record for a session.
802     *
803     * @param sessionId the session id
804     * @param logId the log id
805     * @param pos the position in the log
806     */
807    private void setLastCommitForSession(int sessionId, int logId, int pos) {
808        SessionState state = getOrAddSessionState(sessionId);
809        state.lastCommitLog = logId;
810        state.lastCommitPos = pos;
811        state.inDoubtTransaction = null;
812    }
813 
814    /**
815     * Get the session state for this session. A new object is created if there
816     * is no session state yet.
817     *
818     * @param sessionId the session id
819     * @return the session state object
820     */
821    private SessionState getOrAddSessionState(int sessionId) {
822        Integer key = sessionId;
823        SessionState state = sessionStates.get(key);
824        if (state == null) {
825            state = new SessionState();
826            sessionStates.put(key, state);
827            state.sessionId = sessionId;
828        }
829        return state;
830    }
831 
832    long getSize() {
833        return pageOut == null ? 0 : pageOut.getSize();
834    }
835 
836    ArrayList<InDoubtTransaction> getInDoubtTransactions() {
837        ArrayList<InDoubtTransaction> list = New.arrayList();
838        for (SessionState state : sessionStates.values()) {
839            PageStoreInDoubtTransaction in = state.inDoubtTransaction;
840            if (in != null) {
841                list.add(in);
842            }
843        }
844        return list;
845    }
846 
847    /**
848     * Set the state of an in-doubt transaction.
849     *
850     * @param sessionId the session
851     * @param pageId the page where the commit was prepared
852     * @param commit whether the transaction should be committed
853     */
854    void setInDoubtTransactionState(int sessionId, int pageId, boolean commit) {
855        PageStreamData d = (PageStreamData) store.getPage(pageId);
856        d.initWrite();
857        Data buff = store.createData();
858        buff.writeByte((byte) (commit ? COMMIT : ROLLBACK));
859        buff.writeVarInt(sessionId);
860        byte[] bytes = buff.getBytes();
861        d.write(bytes, 0, bytes.length);
862        bytes = new byte[d.getRemaining()];
863        d.write(bytes, 0, bytes.length);
864        d.write();
865    }
866 
867    /**
868     * Called after the recovery has been completed.
869     */
870    void recoverEnd() {
871        sessionStates = New.hashMap();
872    }
873 
874    private void flushOut() {
875        pageOut.flush();
876    }
877 
878    private Data getBuffer() {
879        if (writeBuffer.length() == 0) {
880            return writeBuffer;
881        }
882        return store.createData();
883    }
884 
885 
886    /**
887     * Get the smallest possible page id used. This is the trunk page if only
888     * appending at the end of the file, or 0.
889     *
890     * @return the smallest possible page.
891     */
892    int getMinPageId() {
893        return pageOut == null ? 0 : pageOut.getMinPageId();
894    }
895 
896}

[all classes][org.h2.store]
EMMA 2.0.5312 (C) Vladimir Roubtsov