EMMA Coverage Report (generated Sun Mar 01 22:06:14 CET 2015)
[all classes][org.h2.store]

COVERAGE SUMMARY FOR SOURCE FILE [LobStorageBackend.java]

nameclass, %method, %block, %line, %
LobStorageBackend.java50%  (1/2)16%  (5/32)13%  (205/1616)15%  (63.4/411)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class LobStorageBackend$LobInputStream0%   (0/1)0%   (0/9)0%   (0/398)0%   (0/92)
LobStorageBackend$LobInputStream (LobStorageBackend, long, long): void 0%   (0/1)0%   (0/129)0%   (0/33)
available (): int 0%   (0/1)0%   (0/4)0%   (0/1)
fillBuffer (): void 0%   (0/1)0%   (0/50)0%   (0/13)
read (): int 0%   (0/1)0%   (0/28)0%   (0/5)
read (byte []): int 0%   (0/1)0%   (0/7)0%   (0/1)
read (byte [], int, int): int 0%   (0/1)0%   (0/6)0%   (0/1)
readFully (byte [], int, int): int 0%   (0/1)0%   (0/72)0%   (0/17)
skip (long): long 0%   (0/1)0%   (0/65)0%   (0/15)
skipSmall (long): int 0%   (0/1)0%   (0/37)0%   (0/6)
     
class LobStorageBackend100% (1/1)22%  (5/23)17%  (205/1218)20%  (63.4/319)
addLob (InputStream, long, int, CountingReaderInputStream): ValueLobDb 0%   (0/1)0%   (0/185)0%   (0/44)
assertHoldsLock (Object): void 0%   (0/1)0%   (0/6)0%   (0/3)
assertNotHolds (Object): void 0%   (0/1)0%   (0/6)0%   (0/3)
copyLob (ValueLobDb, int, long): ValueLobDb 0%   (0/1)0%   (0/102)0%   (0/26)
createBlob (InputStream, long): Value 0%   (0/1)0%   (0/9)0%   (0/2)
createClob (Reader, long): Value 0%   (0/1)0%   (0/25)0%   (0/5)
getHashCacheBlock (int): long 0%   (0/1)0%   (0/25)0%   (0/6)
getInputStream (ValueLobDb, byte [], long): InputStream 0%   (0/1)0%   (0/45)0%   (0/10)
getNextLobId (): long 0%   (0/1)0%   (0/48)0%   (0/13)
initHashCache (): void 0%   (0/1)0%   (0/8)0%   (0/3)
isReadOnly (): boolean 0%   (0/1)0%   (0/4)0%   (0/1)
readBlock (long): byte [] 0%   (0/1)0%   (0/78)0%   (0/17)
registerLob (int, long, int, long, long): ValueLobDb 0%   (0/1)0%   (0/65)0%   (0/14)
removeLob (ValueLobDb): void 0%   (0/1)0%   (0/5)0%   (0/2)
removeLob (long): void 0%   (0/1)0%   (0/142)0%   (0/38)
setHashCacheBlock (int, long): void 0%   (0/1)0%   (0/20)0%   (0/5)
setTable (ValueLobDb, int): void 0%   (0/1)0%   (0/63)0%   (0/17)
storeBlock (long, int, long, byte [], String): void 0%   (0/1)0%   (0/151)0%   (0/38)
removeAllForTable (int): void 100% (1/1)77%  (33/43)80%  (12/15)
reuse (String, PreparedStatement): void 100% (1/1)87%  (13/15)80%  (4/5)
init (): void 100% (1/1)91%  (126/138)91%  (36.4/40)
prepare (String): PreparedStatement 100% (1/1)91%  (21/23)86%  (6/7)
LobStorageBackend (Database): void 100% (1/1)100% (12/12)100% (5/5)

1/*
2 * Copyright 2004-2014 H2 Group. Multiple-Licensed under the MPL 2.0,
3 * and the EPL 1.0 (http://h2database.com/html/license.html).
4 * Initial Developer: H2 Group
5 */
6package org.h2.store;
7 
8import java.io.IOException;
9import java.io.InputStream;
10import java.io.Reader;
11import java.sql.PreparedStatement;
12import java.sql.ResultSet;
13import java.sql.SQLException;
14import java.sql.Statement;
15import java.util.ArrayList;
16import java.util.Arrays;
17import java.util.HashMap;
18 
19import org.h2.api.ErrorCode;
20import org.h2.engine.Database;
21import org.h2.engine.SysProperties;
22import org.h2.jdbc.JdbcConnection;
23import org.h2.message.DbException;
24import org.h2.tools.CompressTool;
25import org.h2.util.IOUtils;
26import org.h2.util.MathUtils;
27import org.h2.util.New;
28import org.h2.value.Value;
29import org.h2.value.ValueLobDb;
30 
31/**
32 * This class stores LOB objects in the database, in tables. This is the
33 * back-end i.e. the server side of the LOB storage.
34 * <p>
35 * Using the system session
36 * <p>
37 * Why do we use the system session to store the data? Some LOB operations can
38 * take a very long time. If we did them on a normal session, we would be
39 * locking the LOB tables for long periods of time, which is extremely
40 * detrimental to the rest of the system. Perhaps when we shift to the MVStore
41 * engine, we can revisit this design decision (using the StreamStore, that is,
42 * no connection at all).
43 * <p>
44 * Locking
45 * <p>
46 * Normally, the locking order in H2 is: first lock the Session object, then
47 * lock the Database object. However, in the case of the LOB data, we are using
48 * the system session to store the data. If we locked the normal way, we see
49 * deadlocks caused by the following pattern:
50 *
51 * <pre>
52 *  Thread 1:
53 *     locks normal session
54 *     locks database
55 *     waiting to lock system session
56 *  Thread 2:
57 *      locks system session
58 *      waiting to lock database.
59 * </pre>
60 *
61 * So, in this class alone, we do two things: we have our very own dedicated
62 * session, the LOB session, and we take the locks in this order: first the
63 * Database object, and then the LOB session. Since we own the LOB session,
64 * no-one else can lock on it, and we are safe.
65 */
66public class LobStorageBackend implements LobStorageInterface {
67 
68    /**
69     * The name of the lob data table. If this table exists, then lob storage is
70     * used.
71     */
72    public static final String LOB_DATA_TABLE = "LOB_DATA";
73 
74    private static final String LOB_SCHEMA = "INFORMATION_SCHEMA";
75    private static final String LOBS = LOB_SCHEMA + ".LOBS";
76    private static final String LOB_MAP = LOB_SCHEMA + ".LOB_MAP";
77    private static final String LOB_DATA = LOB_SCHEMA + "." + LOB_DATA_TABLE;
78 
79    /**
80     * The size of the chunks we use when storing LOBs inside the database file.
81     */
82    private static final int BLOCK_LENGTH = 20000;
83 
84    /**
85     * The size of cache for lob block hashes. Each entry needs 2 longs (16
86     * bytes), therefore, the size 4096 means 64 KB.
87     */
88    private static final int HASH_CACHE_SIZE = 4 * 1024;
89 
90    JdbcConnection conn;
91    final Database database;
92 
93    private final HashMap<String, PreparedStatement> prepared = New.hashMap();
94    private long nextBlock;
95    private final CompressTool compress = CompressTool.getInstance();
96    private long[] hashBlocks;
97 
98    private boolean init;
99 
100    public LobStorageBackend(Database database) {
101        this.database = database;
102    }
103 
104    @Override
105    public void init() {
106        if (init) {
107            return;
108        }
109        synchronized (database) {
110            // have to check this again or we might miss an update on another
111            // thread
112            if (init) {
113                return;
114            }
115            init = true;
116            conn = database.getLobConnectionForRegularUse();
117            JdbcConnection initConn = database.getLobConnectionForInit();
118            try {
119                Statement stat = initConn.createStatement();
120                // stat.execute("SET UNDO_LOG 0");
121                // stat.execute("SET REDO_LOG_BINARY 0");
122                boolean create = true;
123                PreparedStatement prep = initConn.prepareStatement(
124                        "SELECT ZERO() FROM INFORMATION_SCHEMA.COLUMNS WHERE " +
125                        "TABLE_SCHEMA=? AND TABLE_NAME=? AND COLUMN_NAME=?");
126                prep.setString(1, "INFORMATION_SCHEMA");
127                prep.setString(2, "LOB_MAP");
128                prep.setString(3, "POS");
129                ResultSet rs;
130                rs = prep.executeQuery();
131                if (rs.next()) {
132                    prep = initConn.prepareStatement(
133                            "SELECT ZERO() FROM INFORMATION_SCHEMA.TABLES WHERE " +
134                            "TABLE_SCHEMA=? AND TABLE_NAME=?");
135                    prep.setString(1, "INFORMATION_SCHEMA");
136                    prep.setString(2, "LOB_DATA");
137                    rs = prep.executeQuery();
138                    if (rs.next()) {
139                        create = false;
140                    }
141                }
142                if (create) {
143                    stat.execute("CREATE CACHED TABLE IF NOT EXISTS " + LOBS +
144                            "(ID BIGINT PRIMARY KEY, BYTE_COUNT BIGINT, TABLE INT) HIDDEN");
145                    stat.execute("CREATE INDEX IF NOT EXISTS " +
146                            "INFORMATION_SCHEMA.INDEX_LOB_TABLE ON " +
147                            LOBS + "(TABLE)");
148                    stat.execute("CREATE CACHED TABLE IF NOT EXISTS " + LOB_MAP +
149                            "(LOB BIGINT, SEQ INT, POS BIGINT, HASH INT, " +
150                            "BLOCK BIGINT, PRIMARY KEY(LOB, SEQ)) HIDDEN");
151                    stat.execute("ALTER TABLE " + LOB_MAP +
152                            " RENAME TO " + LOB_MAP + " HIDDEN");
153                    stat.execute("ALTER TABLE " + LOB_MAP +
154                            " ADD IF NOT EXISTS POS BIGINT BEFORE HASH");
155                    // TODO the column name OFFSET was used in version 1.3.156,
156                    // so this can be remove in a later version
157                    stat.execute("ALTER TABLE " + LOB_MAP +
158                            " DROP COLUMN IF EXISTS \"OFFSET\"");
159                    stat.execute("CREATE INDEX IF NOT EXISTS " +
160                            "INFORMATION_SCHEMA.INDEX_LOB_MAP_DATA_LOB ON " +
161                            LOB_MAP + "(BLOCK, LOB)");
162                    stat.execute("CREATE CACHED TABLE IF NOT EXISTS " +
163                            LOB_DATA +
164                            "(BLOCK BIGINT PRIMARY KEY, COMPRESSED INT, DATA BINARY) HIDDEN");
165                }
166                rs = stat.executeQuery("SELECT MAX(BLOCK) FROM " + LOB_DATA);
167                rs.next();
168                nextBlock = rs.getLong(1) + 1;
169                stat.close();
170            } catch (SQLException e) {
171                throw DbException.convert(e);
172            }
173        }
174    }
175 
176    private long getNextLobId() throws SQLException {
177        String sql = "SELECT MAX(LOB) FROM " + LOB_MAP;
178        PreparedStatement prep = prepare(sql);
179        ResultSet rs = prep.executeQuery();
180        rs.next();
181        long x = rs.getLong(1) + 1;
182        reuse(sql, prep);
183        sql = "SELECT MAX(ID) FROM " + LOBS;
184        prep = prepare(sql);
185        rs = prep.executeQuery();
186        rs.next();
187        x = Math.max(x, rs.getLong(1) + 1);
188        reuse(sql, prep);
189        return x;
190    }
191 
192    @Override
193    public void removeAllForTable(int tableId) {
194        init();
195        try {
196            String sql = "SELECT ID FROM " + LOBS + " WHERE TABLE = ?";
197            PreparedStatement prep = prepare(sql);
198            prep.setInt(1, tableId);
199            ResultSet rs = prep.executeQuery();
200            while (rs.next()) {
201                removeLob(rs.getLong(1));
202            }
203            reuse(sql, prep);
204        } catch (SQLException e) {
205            throw DbException.convert(e);
206        }
207        if (tableId == LobStorageFrontend.TABLE_ID_SESSION_VARIABLE) {
208            removeAllForTable(LobStorageFrontend.TABLE_TEMP);
209            removeAllForTable(LobStorageFrontend.TABLE_RESULT);
210        }
211    }
212 
213    /**
214     * Read a block of data from the given LOB.
215     *
216     * @param block the block number
217     * @return the block (expanded if stored compressed)
218     */
219    byte[] readBlock(long block) throws SQLException {
220        // see locking discussion at the top
221        assertNotHolds(conn.getSession());
222        synchronized (database) {
223            synchronized (conn.getSession()) {
224                String sql = "SELECT COMPRESSED, DATA FROM " +
225                        LOB_DATA + " WHERE BLOCK = ?";
226                PreparedStatement prep = prepare(sql);
227                prep.setLong(1, block);
228                ResultSet rs = prep.executeQuery();
229                if (!rs.next()) {
230                    throw DbException.get(ErrorCode.IO_EXCEPTION_1,
231                            "Missing lob entry, block: " + block)
232                            .getSQLException();
233                }
234                int compressed = rs.getInt(1);
235                byte[] buffer = rs.getBytes(2);
236                if (compressed != 0) {
237                    buffer = compress.expand(buffer);
238                }
239                reuse(sql, prep);
240                return buffer;
241            }
242        }
243    }
244 
245    /**
246     * Create a prepared statement, or re-use an existing one.
247     *
248     * @param sql the SQL statement
249     * @return the prepared statement
250     */
251    PreparedStatement prepare(String sql) throws SQLException {
252        if (SysProperties.CHECK2) {
253            if (!Thread.holdsLock(database)) {
254                throw DbException.throwInternalError();
255            }
256        }
257        PreparedStatement prep = prepared.remove(sql);
258        if (prep == null) {
259            prep = conn.prepareStatement(sql);
260        }
261        return prep;
262    }
263 
264    /**
265     * Allow to re-use the prepared statement.
266     *
267     * @param sql the SQL statement
268     * @param prep the prepared statement
269     */
270    void reuse(String sql, PreparedStatement prep) {
271        if (SysProperties.CHECK2) {
272            if (!Thread.holdsLock(database)) {
273                throw DbException.throwInternalError();
274            }
275        }
276        prepared.put(sql, prep);
277    }
278 
279    @Override
280    public void removeLob(ValueLobDb lob) {
281        removeLob(lob.getLobId());
282    }
283 
284    private void removeLob(long lobId) {
285        try {
286            // see locking discussion at the top
287            assertNotHolds(conn.getSession());
288            synchronized (database) {
289                synchronized (conn.getSession()) {
290                    String sql = "SELECT BLOCK, HASH FROM " + LOB_MAP + " D WHERE D.LOB = ? " +
291                            "AND NOT EXISTS(SELECT 1 FROM " + LOB_MAP + " O " +
292                            "WHERE O.BLOCK = D.BLOCK AND O.LOB <> ?)";
293                    PreparedStatement prep = prepare(sql);
294                    prep.setLong(1, lobId);
295                    prep.setLong(2, lobId);
296                    ResultSet rs = prep.executeQuery();
297                    ArrayList<Long> blocks = New.arrayList();
298                    while (rs.next()) {
299                        blocks.add(rs.getLong(1));
300                        int hash = rs.getInt(2);
301                        setHashCacheBlock(hash, -1);
302                    }
303                    reuse(sql, prep);
304 
305                    sql = "DELETE FROM " + LOB_MAP + " WHERE LOB = ?";
306                    prep = prepare(sql);
307                    prep.setLong(1, lobId);
308                    prep.execute();
309                    reuse(sql, prep);
310 
311                    sql = "DELETE FROM " + LOB_DATA + " WHERE BLOCK = ?";
312                    prep = prepare(sql);
313                    for (long block : blocks) {
314                        prep.setLong(1, block);
315                        prep.execute();
316                    }
317                    reuse(sql, prep);
318 
319                    sql = "DELETE FROM " + LOBS + " WHERE ID = ?";
320                    prep = prepare(sql);
321                    prep.setLong(1, lobId);
322                    prep.execute();
323                    reuse(sql, prep);
324                }
325            }
326        } catch (SQLException e) {
327            throw DbException.convert(e);
328        }
329    }
330 
331    @Override
332    public InputStream getInputStream(ValueLobDb lob, byte[] hmac,
333            long byteCount) throws IOException {
334        try {
335            init();
336            assertNotHolds(conn.getSession());
337            // see locking discussion at the top
338            synchronized (database) {
339                synchronized (conn.getSession()) {
340                    long lobId = lob.getLobId();
341                    return new LobInputStream(lobId, byteCount);
342                }
343            }
344        } catch (SQLException e) {
345            throw DbException.convertToIOException(e);
346        }
347    }
348 
349    private ValueLobDb addLob(InputStream in, long maxLength, int type,
350            CountingReaderInputStream countingReaderForClob) {
351        try {
352            byte[] buff = new byte[BLOCK_LENGTH];
353            if (maxLength < 0) {
354                maxLength = Long.MAX_VALUE;
355            }
356            long length = 0;
357            long lobId = -1;
358            int maxLengthInPlaceLob = database.getMaxLengthInplaceLob();
359            String compressAlgorithm = database.getLobCompressionAlgorithm(type);
360            try {
361                byte[] small = null;
362                for (int seq = 0; maxLength > 0; seq++) {
363                    int len = (int) Math.min(BLOCK_LENGTH, maxLength);
364                    len = IOUtils.readFully(in, buff, len);
365                    if (len <= 0) {
366                        break;
367                    }
368                    maxLength -= len;
369                    // if we had a short read, trim the buffer
370                    byte[] b;
371                    if (len != buff.length) {
372                        b = new byte[len];
373                        System.arraycopy(buff, 0, b, 0, len);
374                    } else {
375                        b = buff;
376                    }
377                    if (seq == 0 && b.length < BLOCK_LENGTH &&
378                            b.length <= maxLengthInPlaceLob) {
379                        small = b;
380                        break;
381                    }
382                    assertNotHolds(conn.getSession());
383                    // see locking discussion at the top
384                    synchronized (database) {
385                        synchronized (conn.getSession()) {
386                            if (seq == 0) {
387                                lobId = getNextLobId();
388                            }
389                            storeBlock(lobId, seq, length, b, compressAlgorithm);
390                        }
391                    }
392                    length += len;
393                }
394                if (lobId == -1 && small == null) {
395                    // zero length
396                    small = new byte[0];
397                }
398                if (small != null) {
399                    // For a BLOB, precision is length in bytes.
400                    // For a CLOB, precision is length in chars
401                    long precision = countingReaderForClob == null ?
402                            small.length : countingReaderForClob.getLength();
403                    ValueLobDb v = ValueLobDb.createSmallLob(type, small, precision);
404                    return v;
405                }
406                // For a BLOB, precision is length in bytes.
407                // For a CLOB, precision is length in chars
408                long precision = countingReaderForClob == null ?
409                        length : countingReaderForClob.getLength();
410                return registerLob(type, lobId,
411                        LobStorageFrontend.TABLE_TEMP, length, precision);
412            } catch (IOException e) {
413                if (lobId != -1) {
414                    removeLob(lobId);
415                }
416                throw DbException.convertIOException(e, null);
417            }
418        } catch (SQLException e) {
419            throw DbException.convert(e);
420        }
421    }
422 
423    private ValueLobDb registerLob(int type, long lobId, int tableId,
424            long byteCount, long precision) throws SQLException {
425        assertNotHolds(conn.getSession());
426        // see locking discussion at the top
427        synchronized (database) {
428            synchronized (conn.getSession()) {
429                String sql = "INSERT INTO " + LOBS +
430                        "(ID, BYTE_COUNT, TABLE) VALUES(?, ?, ?)";
431                PreparedStatement prep = prepare(sql);
432                prep.setLong(1, lobId);
433                prep.setLong(2, byteCount);
434                prep.setInt(3, tableId);
435                prep.execute();
436                reuse(sql, prep);
437                ValueLobDb v = ValueLobDb.create(type,
438                        database, tableId, lobId, null, precision);
439                return v;
440            }
441        }
442    }
443 
444    @Override
445    public boolean isReadOnly() {
446        return database.isReadOnly();
447    }
448 
449    @Override
450    public ValueLobDb copyLob(ValueLobDb old, int tableId, long length) {
451        int type = old.getType();
452        long oldLobId = old.getLobId();
453        assertNotHolds(conn.getSession());
454        // see locking discussion at the top
455        synchronized (database) {
456            synchronized (conn.getSession()) {
457                try {
458                    init();
459                    long lobId = getNextLobId();
460                    String sql = "INSERT INTO " + LOB_MAP + "(LOB, SEQ, POS, HASH, BLOCK) " +
461                            "SELECT ?, SEQ, POS, HASH, BLOCK FROM " + LOB_MAP + " WHERE LOB = ?";
462                    PreparedStatement prep = prepare(sql);
463                    prep.setLong(1, lobId);
464                    prep.setLong(2, oldLobId);
465                    prep.executeUpdate();
466                    reuse(sql, prep);
467 
468                    sql = "INSERT INTO " + LOBS + "(ID, BYTE_COUNT, TABLE) " +
469                            "SELECT ?, BYTE_COUNT, ? FROM " + LOBS + " WHERE ID = ?";
470                    prep = prepare(sql);
471                    prep.setLong(1, lobId);
472                    prep.setLong(2, tableId);
473                    prep.setLong(3, oldLobId);
474                    prep.executeUpdate();
475                    reuse(sql, prep);
476 
477                    ValueLobDb v = ValueLobDb.create(type, database, tableId, lobId, null, length);
478                    return v;
479                } catch (SQLException e) {
480                    throw DbException.convert(e);
481                }
482            }
483        }
484    }
485 
486    private long getHashCacheBlock(int hash) {
487        if (HASH_CACHE_SIZE > 0) {
488            initHashCache();
489            int index = hash & (HASH_CACHE_SIZE - 1);
490            long oldHash = hashBlocks[index];
491            if (oldHash == hash) {
492                return hashBlocks[index + HASH_CACHE_SIZE];
493            }
494        }
495        return -1;
496    }
497 
498    private void setHashCacheBlock(int hash, long block) {
499        if (HASH_CACHE_SIZE > 0) {
500            initHashCache();
501            int index = hash & (HASH_CACHE_SIZE - 1);
502            hashBlocks[index] = hash;
503            hashBlocks[index + HASH_CACHE_SIZE] = block;
504        }
505    }
506 
507    private void initHashCache() {
508        if (hashBlocks == null) {
509            hashBlocks = new long[HASH_CACHE_SIZE * 2];
510        }
511    }
512 
513    /**
514     * Store a block in the LOB storage.
515     *
516     * @param lobId the lob id
517     * @param seq the sequence number
518     * @param pos the position within the lob
519     * @param b the data
520     * @param compressAlgorithm the compression algorithm (may be null)
521     */
522    void storeBlock(long lobId, int seq, long pos, byte[] b,
523            String compressAlgorithm) throws SQLException {
524        long block;
525        boolean blockExists = false;
526        if (compressAlgorithm != null) {
527            b = compress.compress(b, compressAlgorithm);
528        }
529        int hash = Arrays.hashCode(b);
530        assertHoldsLock(conn.getSession());
531        assertHoldsLock(database);
532        block = getHashCacheBlock(hash);
533        if (block != -1) {
534            String sql =  "SELECT COMPRESSED, DATA FROM " + LOB_DATA +
535                    " WHERE BLOCK = ?";
536            PreparedStatement prep = prepare(sql);
537            prep.setLong(1, block);
538            ResultSet rs = prep.executeQuery();
539            if (rs.next()) {
540                boolean compressed = rs.getInt(1) != 0;
541                byte[] compare = rs.getBytes(2);
542                if (compressed == (compressAlgorithm != null) && Arrays.equals(b, compare)) {
543                    blockExists = true;
544                }
545            }
546            reuse(sql, prep);
547        }
548        if (!blockExists) {
549            block = nextBlock++;
550            setHashCacheBlock(hash, block);
551            String sql = "INSERT INTO " + LOB_DATA +
552                    "(BLOCK, COMPRESSED, DATA) VALUES(?, ?, ?)";
553            PreparedStatement prep = prepare(sql);
554            prep.setLong(1, block);
555            prep.setInt(2, compressAlgorithm == null ? 0 : 1);
556            prep.setBytes(3, b);
557            prep.execute();
558            reuse(sql, prep);
559        }
560        String sql = "INSERT INTO " + LOB_MAP +
561                "(LOB, SEQ, POS, HASH, BLOCK) VALUES(?, ?, ?, ?, ?)";
562        PreparedStatement prep = prepare(sql);
563        prep.setLong(1, lobId);
564        prep.setInt(2, seq);
565        prep.setLong(3, pos);
566        prep.setLong(4, hash);
567        prep.setLong(5, block);
568        prep.execute();
569        reuse(sql, prep);
570    }
571 
572    @Override
573    public Value createBlob(InputStream in, long maxLength) {
574        init();
575        return addLob(in, maxLength, Value.BLOB, null);
576    }
577 
578    @Override
579    public Value createClob(Reader reader, long maxLength) {
580        init();
581        long max = maxLength == -1 ? Long.MAX_VALUE : maxLength;
582        CountingReaderInputStream in = new CountingReaderInputStream(reader, max);
583        ValueLobDb lob = addLob(in, Long.MAX_VALUE, Value.CLOB, in);
584        return lob;
585    }
586 
587    @Override
588    public void setTable(ValueLobDb lob, int table) {
589        long lobId = lob.getLobId();
590        assertNotHolds(conn.getSession());
591        // see locking discussion at the top
592        synchronized (database) {
593            synchronized (conn.getSession()) {
594                try {
595                    init();
596                    String sql = "UPDATE " + LOBS + " SET TABLE = ? WHERE ID = ?";
597                    PreparedStatement prep = prepare(sql);
598                    prep.setInt(1, table);
599                    prep.setLong(2, lobId);
600                    prep.executeUpdate();
601                    reuse(sql, prep);
602                } catch (SQLException e) {
603                    throw DbException.convert(e);
604                }
605            }
606        }
607    }
608 
609    private static void assertNotHolds(Object lock) {
610        if (Thread.holdsLock(lock)) {
611            throw DbException.throwInternalError();
612        }
613    }
614 
615    /**
616     * Check whether this thread has synchronized on this object.
617     *
618     * @param lock the object
619     */
620    static void assertHoldsLock(Object lock) {
621        if (!Thread.holdsLock(lock)) {
622            throw DbException.throwInternalError();
623        }
624    }
625 
626    /**
627     * An input stream that reads from a LOB.
628     */
629    public class LobInputStream extends InputStream {
630 
631        /**
632         * Data from the LOB_MAP table. We cache this to prevent other updates
633         * to the table that contains the LOB column from changing the data
634         * under us.
635         */
636        private final long[] lobMapBlocks;
637 
638        /**
639         * index into the lobMapBlocks array.
640         */
641        private int lobMapIndex;
642 
643        /**
644         * The remaining bytes in the lob.
645         */
646        private long remainingBytes;
647 
648        /**
649         * The temporary buffer.
650         */
651        private byte[] buffer;
652 
653        /**
654         * The position within the buffer.
655         */
656        private int bufferPos;
657 
658 
659        public LobInputStream(long lobId, long byteCount) throws SQLException {
660 
661            // we have to take the lock on the session
662            // before the lock on the database to prevent ABBA deadlocks
663            assertHoldsLock(conn.getSession());
664            assertHoldsLock(database);
665 
666            if (byteCount == -1) {
667                String sql = "SELECT BYTE_COUNT FROM " + LOBS + " WHERE ID = ?";
668                PreparedStatement prep = prepare(sql);
669                prep.setLong(1, lobId);
670                ResultSet rs = prep.executeQuery();
671                if (!rs.next()) {
672                    throw DbException.get(ErrorCode.IO_EXCEPTION_1,
673                            "Missing lob entry: " + lobId).getSQLException();
674                }
675                byteCount = rs.getLong(1);
676                reuse(sql, prep);
677            }
678            this.remainingBytes = byteCount;
679 
680            String sql = "SELECT COUNT(*) FROM " + LOB_MAP + " WHERE LOB = ?";
681            PreparedStatement prep = prepare(sql);
682            prep.setLong(1, lobId);
683            ResultSet rs = prep.executeQuery();
684            rs.next();
685            int lobMapCount = rs.getInt(1);
686            if (lobMapCount == 0) {
687                throw DbException.get(ErrorCode.IO_EXCEPTION_1,
688                        "Missing lob entry: " + lobId).getSQLException();
689            }
690            reuse(sql, prep);
691 
692            this.lobMapBlocks = new long[lobMapCount];
693 
694            sql = "SELECT BLOCK FROM " + LOB_MAP + " WHERE LOB = ? ORDER BY SEQ";
695            prep = prepare(sql);
696            prep.setLong(1, lobId);
697            rs = prep.executeQuery();
698            int i = 0;
699            while (rs.next()) {
700                this.lobMapBlocks[i] = rs.getLong(1);
701                i++;
702            }
703            reuse(sql, prep);
704        }
705 
706        @Override
707        public int read() throws IOException {
708            fillBuffer();
709            if (remainingBytes <= 0) {
710                return -1;
711            }
712            remainingBytes--;
713            return buffer[bufferPos++] & 255;
714        }
715 
716        @Override
717        public long skip(long n) throws IOException {
718            if (n <= 0) {
719                return 0;
720            }
721            long remaining = n;
722            remaining -= skipSmall(remaining);
723            if (remaining > BLOCK_LENGTH) {
724                while (remaining > BLOCK_LENGTH) {
725                    remaining -= BLOCK_LENGTH;
726                    remainingBytes -= BLOCK_LENGTH;
727                    lobMapIndex++;
728                }
729                bufferPos = 0;
730                buffer = null;
731            }
732            fillBuffer();
733            remaining -= skipSmall(remaining);
734            remaining -= super.skip(remaining);
735            return n - remaining;
736        }
737 
738        private int skipSmall(long n) {
739            if (buffer != null && bufferPos < buffer.length) {
740                int x = MathUtils.convertLongToInt(Math.min(n, buffer.length - bufferPos));
741                bufferPos += x;
742                remainingBytes -= x;
743                return x;
744            }
745            return 0;
746        }
747 
748        @Override
749        public int available() throws IOException {
750            return MathUtils.convertLongToInt(remainingBytes);
751        }
752 
753        @Override
754        public int read(byte[] buff) throws IOException {
755            return readFully(buff, 0, buff.length);
756        }
757 
758        @Override
759        public int read(byte[] buff, int off, int length) throws IOException {
760            return readFully(buff, off, length);
761        }
762 
763        private int readFully(byte[] buff, int off, int length) throws IOException {
764            if (length == 0) {
765                return 0;
766            }
767            int read = 0;
768            while (length > 0) {
769                fillBuffer();
770                if (remainingBytes <= 0) {
771                    break;
772                }
773                int len = (int) Math.min(length, remainingBytes);
774                len = Math.min(len, buffer.length - bufferPos);
775                System.arraycopy(buffer, bufferPos, buff, off, len);
776                bufferPos += len;
777                read += len;
778                remainingBytes -= len;
779                off += len;
780                length -= len;
781            }
782            return read == 0 ? -1 : read;
783        }
784 
785        private void fillBuffer() throws IOException {
786            if (buffer != null && bufferPos < buffer.length) {
787                return;
788            }
789            if (remainingBytes <= 0) {
790                return;
791            }
792if (lobMapIndex >= lobMapBlocks.length) {
793    System.out.println("halt!");
794}
795            try {
796                buffer = readBlock(lobMapBlocks[lobMapIndex]);
797                lobMapIndex++;
798                bufferPos = 0;
799            } catch (SQLException e) {
800                throw DbException.convertToIOException(e);
801            }
802        }
803 
804    }
805 
806}

[all classes][org.h2.store]
EMMA 2.0.5312 (C) Vladimir Roubtsov