1 | /* |
2 | * Copyright 2004-2014 H2 Group. Multiple-Licensed under the MPL 2.0, |
3 | * and the EPL 1.0 (http://h2database.com/html/license.html). |
4 | * Initial Developer: H2 Group |
5 | */ |
6 | package org.h2.store; |
7 | |
8 | import java.io.IOException; |
9 | import java.io.InputStream; |
10 | import java.io.Reader; |
11 | import java.sql.PreparedStatement; |
12 | import java.sql.ResultSet; |
13 | import java.sql.SQLException; |
14 | import java.sql.Statement; |
15 | import java.util.ArrayList; |
16 | import java.util.Arrays; |
17 | import java.util.HashMap; |
18 | |
19 | import org.h2.api.ErrorCode; |
20 | import org.h2.engine.Database; |
21 | import org.h2.engine.SysProperties; |
22 | import org.h2.jdbc.JdbcConnection; |
23 | import org.h2.message.DbException; |
24 | import org.h2.tools.CompressTool; |
25 | import org.h2.util.IOUtils; |
26 | import org.h2.util.MathUtils; |
27 | import org.h2.util.New; |
28 | import org.h2.value.Value; |
29 | import org.h2.value.ValueLobDb; |
30 | |
31 | /** |
32 | * This class stores LOB objects in the database, in tables. This is the |
33 | * back-end i.e. the server side of the LOB storage. |
34 | * <p> |
35 | * Using the system session |
36 | * <p> |
37 | * Why do we use the system session to store the data? Some LOB operations can |
38 | * take a very long time. If we did them on a normal session, we would be |
39 | * locking the LOB tables for long periods of time, which is extremely |
40 | * detrimental to the rest of the system. Perhaps when we shift to the MVStore |
41 | * engine, we can revisit this design decision (using the StreamStore, that is, |
42 | * no connection at all). |
43 | * <p> |
44 | * Locking |
45 | * <p> |
46 | * Normally, the locking order in H2 is: first lock the Session object, then |
47 | * lock the Database object. However, in the case of the LOB data, we are using |
48 | * the system session to store the data. If we locked the normal way, we see |
49 | * deadlocks caused by the following pattern: |
50 | * |
51 | * <pre> |
52 | * Thread 1: |
53 | * locks normal session |
54 | * locks database |
55 | * waiting to lock system session |
56 | * Thread 2: |
57 | * locks system session |
58 | * waiting to lock database. |
59 | * </pre> |
60 | * |
61 | * So, in this class alone, we do two things: we have our very own dedicated |
62 | * session, the LOB session, and we take the locks in this order: first the |
63 | * Database object, and then the LOB session. Since we own the LOB session, |
64 | * no-one else can lock on it, and we are safe. |
65 | */ |
66 | public class LobStorageBackend implements LobStorageInterface { |
67 | |
68 | /** |
69 | * The name of the lob data table. If this table exists, then lob storage is |
70 | * used. |
71 | */ |
72 | public static final String LOB_DATA_TABLE = "LOB_DATA"; |
73 | |
74 | private static final String LOB_SCHEMA = "INFORMATION_SCHEMA"; |
75 | private static final String LOBS = LOB_SCHEMA + ".LOBS"; |
76 | private static final String LOB_MAP = LOB_SCHEMA + ".LOB_MAP"; |
77 | private static final String LOB_DATA = LOB_SCHEMA + "." + LOB_DATA_TABLE; |
78 | |
79 | /** |
80 | * The size of the chunks we use when storing LOBs inside the database file. |
81 | */ |
82 | private static final int BLOCK_LENGTH = 20000; |
83 | |
84 | /** |
85 | * The size of cache for lob block hashes. Each entry needs 2 longs (16 |
86 | * bytes), therefore, the size 4096 means 64 KB. |
87 | */ |
88 | private static final int HASH_CACHE_SIZE = 4 * 1024; |
89 | |
90 | JdbcConnection conn; |
91 | final Database database; |
92 | |
93 | private final HashMap<String, PreparedStatement> prepared = New.hashMap(); |
94 | private long nextBlock; |
95 | private final CompressTool compress = CompressTool.getInstance(); |
96 | private long[] hashBlocks; |
97 | |
98 | private boolean init; |
99 | |
100 | public LobStorageBackend(Database database) { |
101 | this.database = database; |
102 | } |
103 | |
104 | @Override |
105 | public void init() { |
106 | if (init) { |
107 | return; |
108 | } |
109 | synchronized (database) { |
110 | // have to check this again or we might miss an update on another |
111 | // thread |
112 | if (init) { |
113 | return; |
114 | } |
115 | init = true; |
116 | conn = database.getLobConnectionForRegularUse(); |
117 | JdbcConnection initConn = database.getLobConnectionForInit(); |
118 | try { |
119 | Statement stat = initConn.createStatement(); |
120 | // stat.execute("SET UNDO_LOG 0"); |
121 | // stat.execute("SET REDO_LOG_BINARY 0"); |
122 | boolean create = true; |
123 | PreparedStatement prep = initConn.prepareStatement( |
124 | "SELECT ZERO() FROM INFORMATION_SCHEMA.COLUMNS WHERE " + |
125 | "TABLE_SCHEMA=? AND TABLE_NAME=? AND COLUMN_NAME=?"); |
126 | prep.setString(1, "INFORMATION_SCHEMA"); |
127 | prep.setString(2, "LOB_MAP"); |
128 | prep.setString(3, "POS"); |
129 | ResultSet rs; |
130 | rs = prep.executeQuery(); |
131 | if (rs.next()) { |
132 | prep = initConn.prepareStatement( |
133 | "SELECT ZERO() FROM INFORMATION_SCHEMA.TABLES WHERE " + |
134 | "TABLE_SCHEMA=? AND TABLE_NAME=?"); |
135 | prep.setString(1, "INFORMATION_SCHEMA"); |
136 | prep.setString(2, "LOB_DATA"); |
137 | rs = prep.executeQuery(); |
138 | if (rs.next()) { |
139 | create = false; |
140 | } |
141 | } |
142 | if (create) { |
143 | stat.execute("CREATE CACHED TABLE IF NOT EXISTS " + LOBS + |
144 | "(ID BIGINT PRIMARY KEY, BYTE_COUNT BIGINT, TABLE INT) HIDDEN"); |
145 | stat.execute("CREATE INDEX IF NOT EXISTS " + |
146 | "INFORMATION_SCHEMA.INDEX_LOB_TABLE ON " + |
147 | LOBS + "(TABLE)"); |
148 | stat.execute("CREATE CACHED TABLE IF NOT EXISTS " + LOB_MAP + |
149 | "(LOB BIGINT, SEQ INT, POS BIGINT, HASH INT, " + |
150 | "BLOCK BIGINT, PRIMARY KEY(LOB, SEQ)) HIDDEN"); |
151 | stat.execute("ALTER TABLE " + LOB_MAP + |
152 | " RENAME TO " + LOB_MAP + " HIDDEN"); |
153 | stat.execute("ALTER TABLE " + LOB_MAP + |
154 | " ADD IF NOT EXISTS POS BIGINT BEFORE HASH"); |
155 | // TODO the column name OFFSET was used in version 1.3.156, |
156 | // so this can be remove in a later version |
157 | stat.execute("ALTER TABLE " + LOB_MAP + |
158 | " DROP COLUMN IF EXISTS \"OFFSET\""); |
159 | stat.execute("CREATE INDEX IF NOT EXISTS " + |
160 | "INFORMATION_SCHEMA.INDEX_LOB_MAP_DATA_LOB ON " + |
161 | LOB_MAP + "(BLOCK, LOB)"); |
162 | stat.execute("CREATE CACHED TABLE IF NOT EXISTS " + |
163 | LOB_DATA + |
164 | "(BLOCK BIGINT PRIMARY KEY, COMPRESSED INT, DATA BINARY) HIDDEN"); |
165 | } |
166 | rs = stat.executeQuery("SELECT MAX(BLOCK) FROM " + LOB_DATA); |
167 | rs.next(); |
168 | nextBlock = rs.getLong(1) + 1; |
169 | stat.close(); |
170 | } catch (SQLException e) { |
171 | throw DbException.convert(e); |
172 | } |
173 | } |
174 | } |
175 | |
176 | private long getNextLobId() throws SQLException { |
177 | String sql = "SELECT MAX(LOB) FROM " + LOB_MAP; |
178 | PreparedStatement prep = prepare(sql); |
179 | ResultSet rs = prep.executeQuery(); |
180 | rs.next(); |
181 | long x = rs.getLong(1) + 1; |
182 | reuse(sql, prep); |
183 | sql = "SELECT MAX(ID) FROM " + LOBS; |
184 | prep = prepare(sql); |
185 | rs = prep.executeQuery(); |
186 | rs.next(); |
187 | x = Math.max(x, rs.getLong(1) + 1); |
188 | reuse(sql, prep); |
189 | return x; |
190 | } |
191 | |
192 | @Override |
193 | public void removeAllForTable(int tableId) { |
194 | init(); |
195 | try { |
196 | String sql = "SELECT ID FROM " + LOBS + " WHERE TABLE = ?"; |
197 | PreparedStatement prep = prepare(sql); |
198 | prep.setInt(1, tableId); |
199 | ResultSet rs = prep.executeQuery(); |
200 | while (rs.next()) { |
201 | removeLob(rs.getLong(1)); |
202 | } |
203 | reuse(sql, prep); |
204 | } catch (SQLException e) { |
205 | throw DbException.convert(e); |
206 | } |
207 | if (tableId == LobStorageFrontend.TABLE_ID_SESSION_VARIABLE) { |
208 | removeAllForTable(LobStorageFrontend.TABLE_TEMP); |
209 | removeAllForTable(LobStorageFrontend.TABLE_RESULT); |
210 | } |
211 | } |
212 | |
213 | /** |
214 | * Read a block of data from the given LOB. |
215 | * |
216 | * @param block the block number |
217 | * @return the block (expanded if stored compressed) |
218 | */ |
219 | byte[] readBlock(long block) throws SQLException { |
220 | // see locking discussion at the top |
221 | assertNotHolds(conn.getSession()); |
222 | synchronized (database) { |
223 | synchronized (conn.getSession()) { |
224 | String sql = "SELECT COMPRESSED, DATA FROM " + |
225 | LOB_DATA + " WHERE BLOCK = ?"; |
226 | PreparedStatement prep = prepare(sql); |
227 | prep.setLong(1, block); |
228 | ResultSet rs = prep.executeQuery(); |
229 | if (!rs.next()) { |
230 | throw DbException.get(ErrorCode.IO_EXCEPTION_1, |
231 | "Missing lob entry, block: " + block) |
232 | .getSQLException(); |
233 | } |
234 | int compressed = rs.getInt(1); |
235 | byte[] buffer = rs.getBytes(2); |
236 | if (compressed != 0) { |
237 | buffer = compress.expand(buffer); |
238 | } |
239 | reuse(sql, prep); |
240 | return buffer; |
241 | } |
242 | } |
243 | } |
244 | |
245 | /** |
246 | * Create a prepared statement, or re-use an existing one. |
247 | * |
248 | * @param sql the SQL statement |
249 | * @return the prepared statement |
250 | */ |
251 | PreparedStatement prepare(String sql) throws SQLException { |
252 | if (SysProperties.CHECK2) { |
253 | if (!Thread.holdsLock(database)) { |
254 | throw DbException.throwInternalError(); |
255 | } |
256 | } |
257 | PreparedStatement prep = prepared.remove(sql); |
258 | if (prep == null) { |
259 | prep = conn.prepareStatement(sql); |
260 | } |
261 | return prep; |
262 | } |
263 | |
264 | /** |
265 | * Allow to re-use the prepared statement. |
266 | * |
267 | * @param sql the SQL statement |
268 | * @param prep the prepared statement |
269 | */ |
270 | void reuse(String sql, PreparedStatement prep) { |
271 | if (SysProperties.CHECK2) { |
272 | if (!Thread.holdsLock(database)) { |
273 | throw DbException.throwInternalError(); |
274 | } |
275 | } |
276 | prepared.put(sql, prep); |
277 | } |
278 | |
279 | @Override |
280 | public void removeLob(ValueLobDb lob) { |
281 | removeLob(lob.getLobId()); |
282 | } |
283 | |
284 | private void removeLob(long lobId) { |
285 | try { |
286 | // see locking discussion at the top |
287 | assertNotHolds(conn.getSession()); |
288 | synchronized (database) { |
289 | synchronized (conn.getSession()) { |
290 | String sql = "SELECT BLOCK, HASH FROM " + LOB_MAP + " D WHERE D.LOB = ? " + |
291 | "AND NOT EXISTS(SELECT 1 FROM " + LOB_MAP + " O " + |
292 | "WHERE O.BLOCK = D.BLOCK AND O.LOB <> ?)"; |
293 | PreparedStatement prep = prepare(sql); |
294 | prep.setLong(1, lobId); |
295 | prep.setLong(2, lobId); |
296 | ResultSet rs = prep.executeQuery(); |
297 | ArrayList<Long> blocks = New.arrayList(); |
298 | while (rs.next()) { |
299 | blocks.add(rs.getLong(1)); |
300 | int hash = rs.getInt(2); |
301 | setHashCacheBlock(hash, -1); |
302 | } |
303 | reuse(sql, prep); |
304 | |
305 | sql = "DELETE FROM " + LOB_MAP + " WHERE LOB = ?"; |
306 | prep = prepare(sql); |
307 | prep.setLong(1, lobId); |
308 | prep.execute(); |
309 | reuse(sql, prep); |
310 | |
311 | sql = "DELETE FROM " + LOB_DATA + " WHERE BLOCK = ?"; |
312 | prep = prepare(sql); |
313 | for (long block : blocks) { |
314 | prep.setLong(1, block); |
315 | prep.execute(); |
316 | } |
317 | reuse(sql, prep); |
318 | |
319 | sql = "DELETE FROM " + LOBS + " WHERE ID = ?"; |
320 | prep = prepare(sql); |
321 | prep.setLong(1, lobId); |
322 | prep.execute(); |
323 | reuse(sql, prep); |
324 | } |
325 | } |
326 | } catch (SQLException e) { |
327 | throw DbException.convert(e); |
328 | } |
329 | } |
330 | |
331 | @Override |
332 | public InputStream getInputStream(ValueLobDb lob, byte[] hmac, |
333 | long byteCount) throws IOException { |
334 | try { |
335 | init(); |
336 | assertNotHolds(conn.getSession()); |
337 | // see locking discussion at the top |
338 | synchronized (database) { |
339 | synchronized (conn.getSession()) { |
340 | long lobId = lob.getLobId(); |
341 | return new LobInputStream(lobId, byteCount); |
342 | } |
343 | } |
344 | } catch (SQLException e) { |
345 | throw DbException.convertToIOException(e); |
346 | } |
347 | } |
348 | |
349 | private ValueLobDb addLob(InputStream in, long maxLength, int type, |
350 | CountingReaderInputStream countingReaderForClob) { |
351 | try { |
352 | byte[] buff = new byte[BLOCK_LENGTH]; |
353 | if (maxLength < 0) { |
354 | maxLength = Long.MAX_VALUE; |
355 | } |
356 | long length = 0; |
357 | long lobId = -1; |
358 | int maxLengthInPlaceLob = database.getMaxLengthInplaceLob(); |
359 | String compressAlgorithm = database.getLobCompressionAlgorithm(type); |
360 | try { |
361 | byte[] small = null; |
362 | for (int seq = 0; maxLength > 0; seq++) { |
363 | int len = (int) Math.min(BLOCK_LENGTH, maxLength); |
364 | len = IOUtils.readFully(in, buff, len); |
365 | if (len <= 0) { |
366 | break; |
367 | } |
368 | maxLength -= len; |
369 | // if we had a short read, trim the buffer |
370 | byte[] b; |
371 | if (len != buff.length) { |
372 | b = new byte[len]; |
373 | System.arraycopy(buff, 0, b, 0, len); |
374 | } else { |
375 | b = buff; |
376 | } |
377 | if (seq == 0 && b.length < BLOCK_LENGTH && |
378 | b.length <= maxLengthInPlaceLob) { |
379 | small = b; |
380 | break; |
381 | } |
382 | assertNotHolds(conn.getSession()); |
383 | // see locking discussion at the top |
384 | synchronized (database) { |
385 | synchronized (conn.getSession()) { |
386 | if (seq == 0) { |
387 | lobId = getNextLobId(); |
388 | } |
389 | storeBlock(lobId, seq, length, b, compressAlgorithm); |
390 | } |
391 | } |
392 | length += len; |
393 | } |
394 | if (lobId == -1 && small == null) { |
395 | // zero length |
396 | small = new byte[0]; |
397 | } |
398 | if (small != null) { |
399 | // For a BLOB, precision is length in bytes. |
400 | // For a CLOB, precision is length in chars |
401 | long precision = countingReaderForClob == null ? |
402 | small.length : countingReaderForClob.getLength(); |
403 | ValueLobDb v = ValueLobDb.createSmallLob(type, small, precision); |
404 | return v; |
405 | } |
406 | // For a BLOB, precision is length in bytes. |
407 | // For a CLOB, precision is length in chars |
408 | long precision = countingReaderForClob == null ? |
409 | length : countingReaderForClob.getLength(); |
410 | return registerLob(type, lobId, |
411 | LobStorageFrontend.TABLE_TEMP, length, precision); |
412 | } catch (IOException e) { |
413 | if (lobId != -1) { |
414 | removeLob(lobId); |
415 | } |
416 | throw DbException.convertIOException(e, null); |
417 | } |
418 | } catch (SQLException e) { |
419 | throw DbException.convert(e); |
420 | } |
421 | } |
422 | |
423 | private ValueLobDb registerLob(int type, long lobId, int tableId, |
424 | long byteCount, long precision) throws SQLException { |
425 | assertNotHolds(conn.getSession()); |
426 | // see locking discussion at the top |
427 | synchronized (database) { |
428 | synchronized (conn.getSession()) { |
429 | String sql = "INSERT INTO " + LOBS + |
430 | "(ID, BYTE_COUNT, TABLE) VALUES(?, ?, ?)"; |
431 | PreparedStatement prep = prepare(sql); |
432 | prep.setLong(1, lobId); |
433 | prep.setLong(2, byteCount); |
434 | prep.setInt(3, tableId); |
435 | prep.execute(); |
436 | reuse(sql, prep); |
437 | ValueLobDb v = ValueLobDb.create(type, |
438 | database, tableId, lobId, null, precision); |
439 | return v; |
440 | } |
441 | } |
442 | } |
443 | |
444 | @Override |
445 | public boolean isReadOnly() { |
446 | return database.isReadOnly(); |
447 | } |
448 | |
449 | @Override |
450 | public ValueLobDb copyLob(ValueLobDb old, int tableId, long length) { |
451 | int type = old.getType(); |
452 | long oldLobId = old.getLobId(); |
453 | assertNotHolds(conn.getSession()); |
454 | // see locking discussion at the top |
455 | synchronized (database) { |
456 | synchronized (conn.getSession()) { |
457 | try { |
458 | init(); |
459 | long lobId = getNextLobId(); |
460 | String sql = "INSERT INTO " + LOB_MAP + "(LOB, SEQ, POS, HASH, BLOCK) " + |
461 | "SELECT ?, SEQ, POS, HASH, BLOCK FROM " + LOB_MAP + " WHERE LOB = ?"; |
462 | PreparedStatement prep = prepare(sql); |
463 | prep.setLong(1, lobId); |
464 | prep.setLong(2, oldLobId); |
465 | prep.executeUpdate(); |
466 | reuse(sql, prep); |
467 | |
468 | sql = "INSERT INTO " + LOBS + "(ID, BYTE_COUNT, TABLE) " + |
469 | "SELECT ?, BYTE_COUNT, ? FROM " + LOBS + " WHERE ID = ?"; |
470 | prep = prepare(sql); |
471 | prep.setLong(1, lobId); |
472 | prep.setLong(2, tableId); |
473 | prep.setLong(3, oldLobId); |
474 | prep.executeUpdate(); |
475 | reuse(sql, prep); |
476 | |
477 | ValueLobDb v = ValueLobDb.create(type, database, tableId, lobId, null, length); |
478 | return v; |
479 | } catch (SQLException e) { |
480 | throw DbException.convert(e); |
481 | } |
482 | } |
483 | } |
484 | } |
485 | |
486 | private long getHashCacheBlock(int hash) { |
487 | if (HASH_CACHE_SIZE > 0) { |
488 | initHashCache(); |
489 | int index = hash & (HASH_CACHE_SIZE - 1); |
490 | long oldHash = hashBlocks[index]; |
491 | if (oldHash == hash) { |
492 | return hashBlocks[index + HASH_CACHE_SIZE]; |
493 | } |
494 | } |
495 | return -1; |
496 | } |
497 | |
498 | private void setHashCacheBlock(int hash, long block) { |
499 | if (HASH_CACHE_SIZE > 0) { |
500 | initHashCache(); |
501 | int index = hash & (HASH_CACHE_SIZE - 1); |
502 | hashBlocks[index] = hash; |
503 | hashBlocks[index + HASH_CACHE_SIZE] = block; |
504 | } |
505 | } |
506 | |
507 | private void initHashCache() { |
508 | if (hashBlocks == null) { |
509 | hashBlocks = new long[HASH_CACHE_SIZE * 2]; |
510 | } |
511 | } |
512 | |
513 | /** |
514 | * Store a block in the LOB storage. |
515 | * |
516 | * @param lobId the lob id |
517 | * @param seq the sequence number |
518 | * @param pos the position within the lob |
519 | * @param b the data |
520 | * @param compressAlgorithm the compression algorithm (may be null) |
521 | */ |
522 | void storeBlock(long lobId, int seq, long pos, byte[] b, |
523 | String compressAlgorithm) throws SQLException { |
524 | long block; |
525 | boolean blockExists = false; |
526 | if (compressAlgorithm != null) { |
527 | b = compress.compress(b, compressAlgorithm); |
528 | } |
529 | int hash = Arrays.hashCode(b); |
530 | assertHoldsLock(conn.getSession()); |
531 | assertHoldsLock(database); |
532 | block = getHashCacheBlock(hash); |
533 | if (block != -1) { |
534 | String sql = "SELECT COMPRESSED, DATA FROM " + LOB_DATA + |
535 | " WHERE BLOCK = ?"; |
536 | PreparedStatement prep = prepare(sql); |
537 | prep.setLong(1, block); |
538 | ResultSet rs = prep.executeQuery(); |
539 | if (rs.next()) { |
540 | boolean compressed = rs.getInt(1) != 0; |
541 | byte[] compare = rs.getBytes(2); |
542 | if (compressed == (compressAlgorithm != null) && Arrays.equals(b, compare)) { |
543 | blockExists = true; |
544 | } |
545 | } |
546 | reuse(sql, prep); |
547 | } |
548 | if (!blockExists) { |
549 | block = nextBlock++; |
550 | setHashCacheBlock(hash, block); |
551 | String sql = "INSERT INTO " + LOB_DATA + |
552 | "(BLOCK, COMPRESSED, DATA) VALUES(?, ?, ?)"; |
553 | PreparedStatement prep = prepare(sql); |
554 | prep.setLong(1, block); |
555 | prep.setInt(2, compressAlgorithm == null ? 0 : 1); |
556 | prep.setBytes(3, b); |
557 | prep.execute(); |
558 | reuse(sql, prep); |
559 | } |
560 | String sql = "INSERT INTO " + LOB_MAP + |
561 | "(LOB, SEQ, POS, HASH, BLOCK) VALUES(?, ?, ?, ?, ?)"; |
562 | PreparedStatement prep = prepare(sql); |
563 | prep.setLong(1, lobId); |
564 | prep.setInt(2, seq); |
565 | prep.setLong(3, pos); |
566 | prep.setLong(4, hash); |
567 | prep.setLong(5, block); |
568 | prep.execute(); |
569 | reuse(sql, prep); |
570 | } |
571 | |
572 | @Override |
573 | public Value createBlob(InputStream in, long maxLength) { |
574 | init(); |
575 | return addLob(in, maxLength, Value.BLOB, null); |
576 | } |
577 | |
578 | @Override |
579 | public Value createClob(Reader reader, long maxLength) { |
580 | init(); |
581 | long max = maxLength == -1 ? Long.MAX_VALUE : maxLength; |
582 | CountingReaderInputStream in = new CountingReaderInputStream(reader, max); |
583 | ValueLobDb lob = addLob(in, Long.MAX_VALUE, Value.CLOB, in); |
584 | return lob; |
585 | } |
586 | |
587 | @Override |
588 | public void setTable(ValueLobDb lob, int table) { |
589 | long lobId = lob.getLobId(); |
590 | assertNotHolds(conn.getSession()); |
591 | // see locking discussion at the top |
592 | synchronized (database) { |
593 | synchronized (conn.getSession()) { |
594 | try { |
595 | init(); |
596 | String sql = "UPDATE " + LOBS + " SET TABLE = ? WHERE ID = ?"; |
597 | PreparedStatement prep = prepare(sql); |
598 | prep.setInt(1, table); |
599 | prep.setLong(2, lobId); |
600 | prep.executeUpdate(); |
601 | reuse(sql, prep); |
602 | } catch (SQLException e) { |
603 | throw DbException.convert(e); |
604 | } |
605 | } |
606 | } |
607 | } |
608 | |
609 | private static void assertNotHolds(Object lock) { |
610 | if (Thread.holdsLock(lock)) { |
611 | throw DbException.throwInternalError(); |
612 | } |
613 | } |
614 | |
615 | /** |
616 | * Check whether this thread has synchronized on this object. |
617 | * |
618 | * @param lock the object |
619 | */ |
620 | static void assertHoldsLock(Object lock) { |
621 | if (!Thread.holdsLock(lock)) { |
622 | throw DbException.throwInternalError(); |
623 | } |
624 | } |
625 | |
626 | /** |
627 | * An input stream that reads from a LOB. |
628 | */ |
629 | public class LobInputStream extends InputStream { |
630 | |
631 | /** |
632 | * Data from the LOB_MAP table. We cache this to prevent other updates |
633 | * to the table that contains the LOB column from changing the data |
634 | * under us. |
635 | */ |
636 | private final long[] lobMapBlocks; |
637 | |
638 | /** |
639 | * index into the lobMapBlocks array. |
640 | */ |
641 | private int lobMapIndex; |
642 | |
643 | /** |
644 | * The remaining bytes in the lob. |
645 | */ |
646 | private long remainingBytes; |
647 | |
648 | /** |
649 | * The temporary buffer. |
650 | */ |
651 | private byte[] buffer; |
652 | |
653 | /** |
654 | * The position within the buffer. |
655 | */ |
656 | private int bufferPos; |
657 | |
658 | |
659 | public LobInputStream(long lobId, long byteCount) throws SQLException { |
660 | |
661 | // we have to take the lock on the session |
662 | // before the lock on the database to prevent ABBA deadlocks |
663 | assertHoldsLock(conn.getSession()); |
664 | assertHoldsLock(database); |
665 | |
666 | if (byteCount == -1) { |
667 | String sql = "SELECT BYTE_COUNT FROM " + LOBS + " WHERE ID = ?"; |
668 | PreparedStatement prep = prepare(sql); |
669 | prep.setLong(1, lobId); |
670 | ResultSet rs = prep.executeQuery(); |
671 | if (!rs.next()) { |
672 | throw DbException.get(ErrorCode.IO_EXCEPTION_1, |
673 | "Missing lob entry: " + lobId).getSQLException(); |
674 | } |
675 | byteCount = rs.getLong(1); |
676 | reuse(sql, prep); |
677 | } |
678 | this.remainingBytes = byteCount; |
679 | |
680 | String sql = "SELECT COUNT(*) FROM " + LOB_MAP + " WHERE LOB = ?"; |
681 | PreparedStatement prep = prepare(sql); |
682 | prep.setLong(1, lobId); |
683 | ResultSet rs = prep.executeQuery(); |
684 | rs.next(); |
685 | int lobMapCount = rs.getInt(1); |
686 | if (lobMapCount == 0) { |
687 | throw DbException.get(ErrorCode.IO_EXCEPTION_1, |
688 | "Missing lob entry: " + lobId).getSQLException(); |
689 | } |
690 | reuse(sql, prep); |
691 | |
692 | this.lobMapBlocks = new long[lobMapCount]; |
693 | |
694 | sql = "SELECT BLOCK FROM " + LOB_MAP + " WHERE LOB = ? ORDER BY SEQ"; |
695 | prep = prepare(sql); |
696 | prep.setLong(1, lobId); |
697 | rs = prep.executeQuery(); |
698 | int i = 0; |
699 | while (rs.next()) { |
700 | this.lobMapBlocks[i] = rs.getLong(1); |
701 | i++; |
702 | } |
703 | reuse(sql, prep); |
704 | } |
705 | |
706 | @Override |
707 | public int read() throws IOException { |
708 | fillBuffer(); |
709 | if (remainingBytes <= 0) { |
710 | return -1; |
711 | } |
712 | remainingBytes--; |
713 | return buffer[bufferPos++] & 255; |
714 | } |
715 | |
716 | @Override |
717 | public long skip(long n) throws IOException { |
718 | if (n <= 0) { |
719 | return 0; |
720 | } |
721 | long remaining = n; |
722 | remaining -= skipSmall(remaining); |
723 | if (remaining > BLOCK_LENGTH) { |
724 | while (remaining > BLOCK_LENGTH) { |
725 | remaining -= BLOCK_LENGTH; |
726 | remainingBytes -= BLOCK_LENGTH; |
727 | lobMapIndex++; |
728 | } |
729 | bufferPos = 0; |
730 | buffer = null; |
731 | } |
732 | fillBuffer(); |
733 | remaining -= skipSmall(remaining); |
734 | remaining -= super.skip(remaining); |
735 | return n - remaining; |
736 | } |
737 | |
738 | private int skipSmall(long n) { |
739 | if (buffer != null && bufferPos < buffer.length) { |
740 | int x = MathUtils.convertLongToInt(Math.min(n, buffer.length - bufferPos)); |
741 | bufferPos += x; |
742 | remainingBytes -= x; |
743 | return x; |
744 | } |
745 | return 0; |
746 | } |
747 | |
748 | @Override |
749 | public int available() throws IOException { |
750 | return MathUtils.convertLongToInt(remainingBytes); |
751 | } |
752 | |
753 | @Override |
754 | public int read(byte[] buff) throws IOException { |
755 | return readFully(buff, 0, buff.length); |
756 | } |
757 | |
758 | @Override |
759 | public int read(byte[] buff, int off, int length) throws IOException { |
760 | return readFully(buff, off, length); |
761 | } |
762 | |
763 | private int readFully(byte[] buff, int off, int length) throws IOException { |
764 | if (length == 0) { |
765 | return 0; |
766 | } |
767 | int read = 0; |
768 | while (length > 0) { |
769 | fillBuffer(); |
770 | if (remainingBytes <= 0) { |
771 | break; |
772 | } |
773 | int len = (int) Math.min(length, remainingBytes); |
774 | len = Math.min(len, buffer.length - bufferPos); |
775 | System.arraycopy(buffer, bufferPos, buff, off, len); |
776 | bufferPos += len; |
777 | read += len; |
778 | remainingBytes -= len; |
779 | off += len; |
780 | length -= len; |
781 | } |
782 | return read == 0 ? -1 : read; |
783 | } |
784 | |
785 | private void fillBuffer() throws IOException { |
786 | if (buffer != null && bufferPos < buffer.length) { |
787 | return; |
788 | } |
789 | if (remainingBytes <= 0) { |
790 | return; |
791 | } |
792 | if (lobMapIndex >= lobMapBlocks.length) { |
793 | System.out.println("halt!"); |
794 | } |
795 | try { |
796 | buffer = readBlock(lobMapBlocks[lobMapIndex]); |
797 | lobMapIndex++; |
798 | bufferPos = 0; |
799 | } catch (SQLException e) { |
800 | throw DbException.convertToIOException(e); |
801 | } |
802 | } |
803 | |
804 | } |
805 | |
806 | } |