1 | /* |
2 | * Copyright 2004-2014 H2 Group. Multiple-Licensed under the MPL 2.0, |
3 | * and the EPL 1.0 (http://h2database.com/html/license.html). |
4 | * Initial Developer: H2 Group |
5 | */ |
6 | package org.h2.mvstore; |
7 | |
8 | import java.io.ByteArrayInputStream; |
9 | import java.io.ByteArrayOutputStream; |
10 | import java.io.IOException; |
11 | import java.io.InputStream; |
12 | import java.nio.ByteBuffer; |
13 | import java.util.Arrays; |
14 | import java.util.Map; |
15 | import java.util.concurrent.atomic.AtomicLong; |
16 | import java.util.concurrent.atomic.AtomicReference; |
17 | |
18 | /** |
19 | * A facility to store streams in a map. Streams are split into blocks, which |
20 | * are stored in a map. Very small streams are inlined in the stream id. |
21 | * <p> |
22 | * The key of the map is a long (incremented for each stored block). The default |
23 | * initial value is 0. Before storing blocks into the map, the stream store |
24 | * checks if there is already a block with the next key, and if necessary |
25 | * searches the next free entry using a binary search (0 to Long.MAX_VALUE). |
26 | * <p> |
27 | * Before storing |
28 | * <p> |
29 | * The format of the binary id is: An empty id represents 0 bytes of data. |
30 | * In-place data is encoded as 0, the size (a variable size int), then the data. |
31 | * A stored block is encoded as 1, the length of the block (a variable size |
32 | * int), then the key (a variable size long). Multiple ids can be concatenated |
33 | * to concatenate the data. If the id is large, it is stored itself, which is |
34 | * encoded as 2, the total length (a variable size long), and the key of the |
35 | * block that contains the id (a variable size long). |
36 | */ |
37 | public class StreamStore { |
38 | |
39 | private final Map<Long, byte[]> map; |
40 | private int minBlockSize = 256; |
41 | private int maxBlockSize = 256 * 1024; |
42 | private final AtomicLong nextKey = new AtomicLong(); |
43 | private final AtomicReference<byte[]> nextBuffer = |
44 | new AtomicReference<byte[]>(); |
45 | |
46 | /** |
47 | * Create a stream store instance. |
48 | * |
49 | * @param map the map to store blocks of data |
50 | */ |
51 | public StreamStore(Map<Long, byte[]> map) { |
52 | this.map = map; |
53 | } |
54 | |
55 | public Map<Long, byte[]> getMap() { |
56 | return map; |
57 | } |
58 | |
59 | public void setNextKey(long nextKey) { |
60 | this.nextKey.set(nextKey); |
61 | } |
62 | |
63 | public long getNextKey() { |
64 | return nextKey.get(); |
65 | } |
66 | |
67 | /** |
68 | * Set the minimum block size. The default is 256 bytes. |
69 | * |
70 | * @param minBlockSize the new value |
71 | */ |
72 | public void setMinBlockSize(int minBlockSize) { |
73 | this.minBlockSize = minBlockSize; |
74 | } |
75 | |
76 | public int getMinBlockSize() { |
77 | return minBlockSize; |
78 | } |
79 | |
80 | /** |
81 | * Set the maximum block size. The default is 256 KB. |
82 | * |
83 | * @param maxBlockSize the new value |
84 | */ |
85 | public void setMaxBlockSize(int maxBlockSize) { |
86 | this.maxBlockSize = maxBlockSize; |
87 | } |
88 | |
89 | public long getMaxBlockSize() { |
90 | return maxBlockSize; |
91 | } |
92 | |
93 | /** |
94 | * Store the stream, and return the id. The stream is not closed. |
95 | * |
96 | * @param in the stream |
97 | * @return the id (potentially an empty array) |
98 | */ |
99 | public byte[] put(InputStream in) throws IOException { |
100 | ByteArrayOutputStream id = new ByteArrayOutputStream(); |
101 | int level = 0; |
102 | try { |
103 | while (true) { |
104 | if (put(id, in, level)) { |
105 | break; |
106 | } |
107 | if (id.size() > maxBlockSize / 2) { |
108 | id = putIndirectId(id); |
109 | level++; |
110 | } |
111 | } |
112 | } catch (IOException e) { |
113 | remove(id.toByteArray()); |
114 | throw e; |
115 | } |
116 | if (id.size() > minBlockSize * 2) { |
117 | id = putIndirectId(id); |
118 | } |
119 | return id.toByteArray(); |
120 | } |
121 | |
122 | private boolean put(ByteArrayOutputStream id, InputStream in, int level) |
123 | throws IOException { |
124 | if (level > 0) { |
125 | ByteArrayOutputStream id2 = new ByteArrayOutputStream(); |
126 | while (true) { |
127 | boolean eof = put(id2, in, level - 1); |
128 | if (id2.size() > maxBlockSize / 2) { |
129 | id2 = putIndirectId(id2); |
130 | id2.writeTo(id); |
131 | return eof; |
132 | } else if (eof) { |
133 | id2.writeTo(id); |
134 | return true; |
135 | } |
136 | } |
137 | } |
138 | byte[] readBuffer = nextBuffer.getAndSet(null); |
139 | if (readBuffer == null) { |
140 | readBuffer = new byte[maxBlockSize]; |
141 | } |
142 | byte[] buff = read(in, readBuffer); |
143 | if (buff != readBuffer) { |
144 | // re-use the buffer if the result was shorter |
145 | nextBuffer.set(readBuffer); |
146 | } |
147 | int len = buff.length; |
148 | if (len == 0) { |
149 | return true; |
150 | } |
151 | boolean eof = len < maxBlockSize; |
152 | if (len < minBlockSize) { |
153 | // in-place: 0, len (int), data |
154 | id.write(0); |
155 | DataUtils.writeVarInt(id, len); |
156 | id.write(buff); |
157 | } else { |
158 | // block: 1, len (int), blockId (long) |
159 | id.write(1); |
160 | DataUtils.writeVarInt(id, len); |
161 | DataUtils.writeVarLong(id, writeBlock(buff)); |
162 | } |
163 | return eof; |
164 | } |
165 | |
166 | private static byte[] read(InputStream in, byte[] target) |
167 | throws IOException { |
168 | int copied = 0; |
169 | int remaining = target.length; |
170 | while (remaining > 0) { |
171 | try { |
172 | int len = in.read(target, copied, remaining); |
173 | if (len < 0) { |
174 | return Arrays.copyOf(target, copied); |
175 | } |
176 | copied += len; |
177 | remaining -= len; |
178 | } catch (RuntimeException e) { |
179 | throw new IOException(e); |
180 | } |
181 | } |
182 | return target; |
183 | } |
184 | |
185 | private ByteArrayOutputStream putIndirectId(ByteArrayOutputStream id) |
186 | throws IOException { |
187 | byte[] data = id.toByteArray(); |
188 | id = new ByteArrayOutputStream(); |
189 | // indirect: 2, total len (long), blockId (long) |
190 | id.write(2); |
191 | DataUtils.writeVarLong(id, length(data)); |
192 | DataUtils.writeVarLong(id, writeBlock(data)); |
193 | return id; |
194 | } |
195 | |
196 | private long writeBlock(byte[] data) { |
197 | long key = getAndIncrementNextKey(); |
198 | map.put(key, data); |
199 | onStore(data.length); |
200 | return key; |
201 | } |
202 | |
203 | /** |
204 | * This method is called after a block of data is stored. Override this |
205 | * method to persist data if necessary. |
206 | * |
207 | * @param len the length of the stored block. |
208 | */ |
209 | protected void onStore(int len) { |
210 | // do nothing by default |
211 | } |
212 | |
213 | /** |
214 | * Generate a new key. |
215 | * |
216 | * @return the new key |
217 | */ |
218 | private long getAndIncrementNextKey() { |
219 | long key = nextKey.getAndIncrement(); |
220 | if (!map.containsKey(key)) { |
221 | return key; |
222 | } |
223 | // search the next free id using binary search |
224 | synchronized (this) { |
225 | long low = key, high = Long.MAX_VALUE; |
226 | while (low < high) { |
227 | long x = (low + high) >>> 1; |
228 | if (map.containsKey(x)) { |
229 | low = x + 1; |
230 | } else { |
231 | high = x; |
232 | } |
233 | } |
234 | key = low; |
235 | nextKey.set(key + 1); |
236 | return key; |
237 | } |
238 | } |
239 | |
240 | /** |
241 | * Get the key of the biggest block, of -1 for inline data. |
242 | * This method is used to garbage collect orphaned blocks. |
243 | * |
244 | * @param id the id |
245 | * @return the key, or -1 |
246 | */ |
247 | public long getMaxBlockKey(byte[] id) { |
248 | long maxKey = -1; |
249 | ByteBuffer idBuffer = ByteBuffer.wrap(id); |
250 | while (idBuffer.hasRemaining()) { |
251 | switch (idBuffer.get()) { |
252 | case 0: |
253 | // in-place: 0, len (int), data |
254 | int len = DataUtils.readVarInt(idBuffer); |
255 | idBuffer.position(idBuffer.position() + len); |
256 | break; |
257 | case 1: |
258 | // block: 1, len (int), blockId (long) |
259 | DataUtils.readVarInt(idBuffer); |
260 | long k = DataUtils.readVarLong(idBuffer); |
261 | maxKey = Math.max(maxKey, k); |
262 | break; |
263 | case 2: |
264 | // indirect: 2, total len (long), blockId (long) |
265 | DataUtils.readVarLong(idBuffer); |
266 | long k2 = DataUtils.readVarLong(idBuffer); |
267 | // recurse |
268 | byte[] r = map.get(k2); |
269 | maxKey = Math.max(maxKey, getMaxBlockKey(r)); |
270 | break; |
271 | default: |
272 | throw DataUtils.newIllegalArgumentException( |
273 | "Unsupported id {0}", Arrays.toString(id)); |
274 | } |
275 | } |
276 | return maxKey; |
277 | } |
278 | |
279 | /** |
280 | * Remove all stored blocks for the given id. |
281 | * |
282 | * @param id the id |
283 | */ |
284 | public void remove(byte[] id) { |
285 | ByteBuffer idBuffer = ByteBuffer.wrap(id); |
286 | while (idBuffer.hasRemaining()) { |
287 | switch (idBuffer.get()) { |
288 | case 0: |
289 | // in-place: 0, len (int), data |
290 | int len = DataUtils.readVarInt(idBuffer); |
291 | idBuffer.position(idBuffer.position() + len); |
292 | break; |
293 | case 1: |
294 | // block: 1, len (int), blockId (long) |
295 | DataUtils.readVarInt(idBuffer); |
296 | long k = DataUtils.readVarLong(idBuffer); |
297 | map.remove(k); |
298 | break; |
299 | case 2: |
300 | // indirect: 2, total len (long), blockId (long) |
301 | DataUtils.readVarLong(idBuffer); |
302 | long k2 = DataUtils.readVarLong(idBuffer); |
303 | // recurse |
304 | remove(map.get(k2)); |
305 | map.remove(k2); |
306 | break; |
307 | default: |
308 | throw DataUtils.newIllegalArgumentException( |
309 | "Unsupported id {0}", Arrays.toString(id)); |
310 | } |
311 | } |
312 | } |
313 | |
314 | /** |
315 | * Calculate the number of data bytes for the given id. As the length is |
316 | * encoded in the id, this operation does not cause any reads in the map. |
317 | * |
318 | * @param id the id |
319 | * @return the length |
320 | */ |
321 | public long length(byte[] id) { |
322 | ByteBuffer idBuffer = ByteBuffer.wrap(id); |
323 | long length = 0; |
324 | while (idBuffer.hasRemaining()) { |
325 | switch (idBuffer.get()) { |
326 | case 0: |
327 | // in-place: 0, len (int), data |
328 | int len = DataUtils.readVarInt(idBuffer); |
329 | idBuffer.position(idBuffer.position() + len); |
330 | length += len; |
331 | break; |
332 | case 1: |
333 | // block: 1, len (int), blockId (long) |
334 | length += DataUtils.readVarInt(idBuffer); |
335 | DataUtils.readVarLong(idBuffer); |
336 | break; |
337 | case 2: |
338 | // indirect: 2, total len (long), blockId (long) |
339 | length += DataUtils.readVarLong(idBuffer); |
340 | DataUtils.readVarLong(idBuffer); |
341 | break; |
342 | default: |
343 | throw DataUtils.newIllegalArgumentException( |
344 | "Unsupported id {0}", Arrays.toString(id)); |
345 | } |
346 | } |
347 | return length; |
348 | } |
349 | |
350 | /** |
351 | * Check whether the id itself contains all the data. This operation does |
352 | * not cause any reads in the map. |
353 | * |
354 | * @param id the id |
355 | * @return if the id contains the data |
356 | */ |
357 | public boolean isInPlace(byte[] id) { |
358 | ByteBuffer idBuffer = ByteBuffer.wrap(id); |
359 | while (idBuffer.hasRemaining()) { |
360 | if (idBuffer.get() != 0) { |
361 | return false; |
362 | } |
363 | int len = DataUtils.readVarInt(idBuffer); |
364 | idBuffer.position(idBuffer.position() + len); |
365 | } |
366 | return true; |
367 | } |
368 | |
369 | /** |
370 | * Open an input stream to read data. |
371 | * |
372 | * @param id the id |
373 | * @return the stream |
374 | */ |
375 | public InputStream get(byte[] id) { |
376 | return new Stream(this, id); |
377 | } |
378 | |
379 | /** |
380 | * Get the block. |
381 | * |
382 | * @param key the key |
383 | * @return the block |
384 | */ |
385 | byte[] getBlock(long key) { |
386 | byte[] data = map.get(key); |
387 | if (data == null) { |
388 | throw DataUtils.newIllegalStateException( |
389 | DataUtils.ERROR_BLOCK_NOT_FOUND, |
390 | "Block {0} not found", key); |
391 | } |
392 | return data; |
393 | } |
394 | |
395 | /** |
396 | * A stream backed by a map. |
397 | */ |
398 | static class Stream extends InputStream { |
399 | |
400 | private final StreamStore store; |
401 | private byte[] oneByteBuffer; |
402 | private ByteBuffer idBuffer; |
403 | private ByteArrayInputStream buffer; |
404 | private long skip; |
405 | private final long length; |
406 | private long pos; |
407 | |
408 | Stream(StreamStore store, byte[] id) { |
409 | this.store = store; |
410 | this.length = store.length(id); |
411 | this.idBuffer = ByteBuffer.wrap(id); |
412 | } |
413 | |
414 | @Override |
415 | public int read() throws IOException { |
416 | byte[] buffer = oneByteBuffer; |
417 | if (buffer == null) { |
418 | buffer = oneByteBuffer = new byte[1]; |
419 | } |
420 | int len = read(buffer, 0, 1); |
421 | return len == -1 ? -1 : (buffer[0] & 255); |
422 | } |
423 | |
424 | @Override |
425 | public long skip(long n) { |
426 | n = Math.min(length - pos, n); |
427 | if (n == 0) { |
428 | return 0; |
429 | } |
430 | if (buffer != null) { |
431 | long s = buffer.skip(n); |
432 | if (s > 0) { |
433 | n = s; |
434 | } else { |
435 | buffer = null; |
436 | skip += n; |
437 | } |
438 | } else { |
439 | skip += n; |
440 | } |
441 | pos += n; |
442 | return n; |
443 | } |
444 | |
445 | @Override |
446 | public void close() { |
447 | buffer = null; |
448 | idBuffer.position(idBuffer.limit()); |
449 | pos = length; |
450 | } |
451 | |
452 | @Override |
453 | public int read(byte[] b, int off, int len) throws IOException { |
454 | if (len <= 0) { |
455 | return 0; |
456 | } |
457 | while (true) { |
458 | if (buffer == null) { |
459 | try { |
460 | buffer = nextBuffer(); |
461 | } catch (IllegalStateException e) { |
462 | String msg = DataUtils.formatMessage( |
463 | DataUtils.ERROR_BLOCK_NOT_FOUND, |
464 | "Block not found in id {0}", |
465 | Arrays.toString(idBuffer.array())); |
466 | throw new IOException(msg, e); |
467 | } |
468 | if (buffer == null) { |
469 | return -1; |
470 | } |
471 | } |
472 | int result = buffer.read(b, off, len); |
473 | if (result > 0) { |
474 | pos += result; |
475 | return result; |
476 | } |
477 | buffer = null; |
478 | } |
479 | } |
480 | |
481 | private ByteArrayInputStream nextBuffer() { |
482 | while (idBuffer.hasRemaining()) { |
483 | switch (idBuffer.get()) { |
484 | case 0: { |
485 | int len = DataUtils.readVarInt(idBuffer); |
486 | if (skip >= len) { |
487 | skip -= len; |
488 | idBuffer.position(idBuffer.position() + len); |
489 | continue; |
490 | } |
491 | int p = (int) (idBuffer.position() + skip); |
492 | int l = (int) (len - skip); |
493 | idBuffer.position(p + l); |
494 | return new ByteArrayInputStream(idBuffer.array(), p, l); |
495 | } |
496 | case 1: { |
497 | int len = DataUtils.readVarInt(idBuffer); |
498 | long key = DataUtils.readVarLong(idBuffer); |
499 | if (skip >= len) { |
500 | skip -= len; |
501 | continue; |
502 | } |
503 | byte[] data = store.getBlock(key); |
504 | int s = (int) skip; |
505 | skip = 0; |
506 | return new ByteArrayInputStream(data, s, data.length - s); |
507 | } |
508 | case 2: { |
509 | long len = DataUtils.readVarLong(idBuffer); |
510 | long key = DataUtils.readVarLong(idBuffer); |
511 | if (skip >= len) { |
512 | skip -= len; |
513 | continue; |
514 | } |
515 | byte[] k = store.getBlock(key); |
516 | ByteBuffer newBuffer = ByteBuffer.allocate(k.length |
517 | + idBuffer.limit() - idBuffer.position()); |
518 | newBuffer.put(k); |
519 | newBuffer.put(idBuffer); |
520 | newBuffer.flip(); |
521 | idBuffer = newBuffer; |
522 | return nextBuffer(); |
523 | } |
524 | default: |
525 | throw DataUtils.newIllegalArgumentException( |
526 | "Unsupported id {0}", |
527 | Arrays.toString(idBuffer.array())); |
528 | } |
529 | } |
530 | return null; |
531 | } |
532 | |
533 | } |
534 | |
535 | } |