EMMA Coverage Report (generated Sun Mar 01 22:06:14 CET 2015)
[all classes][org.h2.mvstore]

COVERAGE SUMMARY FOR SOURCE FILE [StreamStore.java]

nameclass, %method, %block, %line, %
StreamStore.java100% (2/2)100% (27/27)95%  (883/929)96%  (229.1/239)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class StreamStore100% (1/1)100% (21/21)92%  (533/579)94%  (152.1/162)
getMaxBlockKey (byte []): long 100% (1/1)38%  (25/66)50%  (9.1/18)
getAndIncrementNextKey (): long 100% (1/1)92%  (56/61)93%  (14/15)
StreamStore (Map): void 100% (1/1)100% (22/22)100% (7/7)
get (byte []): InputStream 100% (1/1)100% (6/6)100% (1/1)
getBlock (long): byte [] 100% (1/1)100% (22/22)100% (4/4)
getMap (): Map 100% (1/1)100% (3/3)100% (1/1)
getMaxBlockSize (): long 100% (1/1)100% (4/4)100% (1/1)
getMinBlockSize (): int 100% (1/1)100% (3/3)100% (1/1)
getNextKey (): long 100% (1/1)100% (4/4)100% (1/1)
isInPlace (byte []): boolean 100% (1/1)100% (24/24)100% (8/8)
length (byte []): long 100% (1/1)100% (59/59)100% (16/16)
onStore (int): void 100% (1/1)100% (1/1)100% (1/1)
put (ByteArrayOutputStream, InputStream, int): boolean 100% (1/1)100% (103/103)100% (29/29)
put (InputStream): byte [] 100% (1/1)100% (48/48)100% (14/14)
putIndirectId (ByteArrayOutputStream): ByteArrayOutputStream 100% (1/1)100% (22/22)100% (6/6)
read (InputStream, byte []): byte [] 100% (1/1)100% (36/36)100% (12/12)
remove (byte []): void 100% (1/1)100% (66/66)100% (17/17)
setMaxBlockSize (int): void 100% (1/1)100% (4/4)100% (2/2)
setMinBlockSize (int): void 100% (1/1)100% (4/4)100% (2/2)
setNextKey (long): void 100% (1/1)100% (5/5)100% (2/2)
writeBlock (byte []): long 100% (1/1)100% (16/16)100% (4/4)
     
class StreamStore$Stream100% (1/1)100% (6/6)100% (350/350)100% (77/77)
StreamStore$Stream (StreamStore, byte []): void 100% (1/1)100% (15/15)100% (5/5)
close (): void 100% (1/1)100% (15/15)100% (4/4)
nextBuffer (): ByteArrayInputStream 100% (1/1)100% (180/180)100% (34/34)
read (): int 100% (1/1)100% (28/28)100% (5/5)
read (byte [], int, int): int 100% (1/1)100% (59/59)100% (16/16)
skip (long): long 100% (1/1)100% (53/53)100% (13/13)

1/*
2 * Copyright 2004-2014 H2 Group. Multiple-Licensed under the MPL 2.0,
3 * and the EPL 1.0 (http://h2database.com/html/license.html).
4 * Initial Developer: H2 Group
5 */
6package org.h2.mvstore;
7 
8import java.io.ByteArrayInputStream;
9import java.io.ByteArrayOutputStream;
10import java.io.IOException;
11import java.io.InputStream;
12import java.nio.ByteBuffer;
13import java.util.Arrays;
14import java.util.Map;
15import java.util.concurrent.atomic.AtomicLong;
16import java.util.concurrent.atomic.AtomicReference;
17 
18/**
19 * A facility to store streams in a map. Streams are split into blocks, which
20 * are stored in a map. Very small streams are inlined in the stream id.
21 * <p>
22 * The key of the map is a long (incremented for each stored block). The default
23 * initial value is 0. Before storing blocks into the map, the stream store
24 * checks if there is already a block with the next key, and if necessary
25 * searches the next free entry using a binary search (0 to Long.MAX_VALUE).
26 * <p>
27 * Before storing
28 * <p>
29 * The format of the binary id is: An empty id represents 0 bytes of data.
30 * In-place data is encoded as 0, the size (a variable size int), then the data.
31 * A stored block is encoded as 1, the length of the block (a variable size
32 * int), then the key (a variable size long). Multiple ids can be concatenated
33 * to concatenate the data. If the id is large, it is stored itself, which is
34 * encoded as 2, the total length (a variable size long), and the key of the
35 * block that contains the id (a variable size long).
36 */
37public class StreamStore {
38 
39    private final Map<Long, byte[]> map;
40    private int minBlockSize = 256;
41    private int maxBlockSize = 256 * 1024;
42    private final AtomicLong nextKey = new AtomicLong();
43    private final AtomicReference<byte[]> nextBuffer =
44            new AtomicReference<byte[]>();
45 
46    /**
47     * Create a stream store instance.
48     *
49     * @param map the map to store blocks of data
50     */
51    public StreamStore(Map<Long, byte[]> map) {
52        this.map = map;
53    }
54 
55    public Map<Long, byte[]> getMap() {
56        return map;
57    }
58 
59    public void setNextKey(long nextKey) {
60        this.nextKey.set(nextKey);
61    }
62 
63    public long getNextKey() {
64        return nextKey.get();
65    }
66 
67    /**
68     * Set the minimum block size. The default is 256 bytes.
69     *
70     * @param minBlockSize the new value
71     */
72    public void setMinBlockSize(int minBlockSize) {
73        this.minBlockSize = minBlockSize;
74    }
75 
76    public int getMinBlockSize() {
77        return minBlockSize;
78    }
79 
80    /**
81     * Set the maximum block size. The default is 256 KB.
82     *
83     * @param maxBlockSize the new value
84     */
85    public void setMaxBlockSize(int maxBlockSize) {
86        this.maxBlockSize = maxBlockSize;
87    }
88 
89    public long getMaxBlockSize() {
90        return maxBlockSize;
91    }
92 
93    /**
94     * Store the stream, and return the id. The stream is not closed.
95     *
96     * @param in the stream
97     * @return the id (potentially an empty array)
98     */
99    public byte[] put(InputStream in) throws IOException {
100        ByteArrayOutputStream id = new ByteArrayOutputStream();
101        int level = 0;
102        try {
103            while (true) {
104                if (put(id, in, level)) {
105                    break;
106                }
107                if (id.size() > maxBlockSize / 2) {
108                    id = putIndirectId(id);
109                    level++;
110                }
111            }
112        } catch (IOException e) {
113            remove(id.toByteArray());
114            throw e;
115        }
116        if (id.size() > minBlockSize * 2) {
117            id = putIndirectId(id);
118        }
119        return id.toByteArray();
120    }
121 
122    private boolean put(ByteArrayOutputStream id, InputStream in, int level)
123            throws IOException {
124        if (level > 0) {
125            ByteArrayOutputStream id2 = new ByteArrayOutputStream();
126            while (true) {
127                boolean eof = put(id2, in, level - 1);
128                if (id2.size() > maxBlockSize / 2) {
129                    id2 = putIndirectId(id2);
130                    id2.writeTo(id);
131                    return eof;
132                } else if (eof) {
133                    id2.writeTo(id);
134                    return true;
135                }
136            }
137        }
138        byte[] readBuffer = nextBuffer.getAndSet(null);
139        if (readBuffer == null) {
140            readBuffer = new byte[maxBlockSize];
141        }
142        byte[] buff = read(in, readBuffer);
143        if (buff != readBuffer) {
144            // re-use the buffer if the result was shorter
145            nextBuffer.set(readBuffer);
146        }
147        int len = buff.length;
148        if (len == 0) {
149            return true;
150        }
151        boolean eof = len < maxBlockSize;
152        if (len < minBlockSize) {
153            // in-place: 0, len (int), data
154            id.write(0);
155            DataUtils.writeVarInt(id, len);
156            id.write(buff);
157        } else {
158            // block: 1, len (int), blockId (long)
159            id.write(1);
160            DataUtils.writeVarInt(id, len);
161            DataUtils.writeVarLong(id, writeBlock(buff));
162        }
163        return eof;
164    }
165 
166    private static byte[] read(InputStream in, byte[] target)
167            throws IOException {
168        int copied = 0;
169        int remaining = target.length;
170        while (remaining > 0) {
171            try {
172                int len = in.read(target, copied, remaining);
173                if (len < 0) {
174                    return Arrays.copyOf(target, copied);
175                }
176                copied += len;
177                remaining -= len;
178            } catch (RuntimeException e) {
179                throw new IOException(e);
180            }
181        }
182        return target;
183    }
184 
185    private ByteArrayOutputStream putIndirectId(ByteArrayOutputStream id)
186            throws IOException {
187        byte[] data = id.toByteArray();
188        id = new ByteArrayOutputStream();
189        // indirect: 2, total len (long), blockId (long)
190        id.write(2);
191        DataUtils.writeVarLong(id, length(data));
192        DataUtils.writeVarLong(id, writeBlock(data));
193        return id;
194    }
195 
196    private long writeBlock(byte[] data) {
197        long key = getAndIncrementNextKey();
198        map.put(key, data);
199        onStore(data.length);
200        return key;
201    }
202 
203    /**
204     * This method is called after a block of data is stored. Override this
205     * method to persist data if necessary.
206     *
207     * @param len the length of the stored block.
208     */
209    protected void onStore(int len) {
210        // do nothing by default
211    }
212 
213    /**
214     * Generate a new key.
215     *
216     * @return the new key
217     */
218    private long getAndIncrementNextKey() {
219        long key = nextKey.getAndIncrement();
220        if (!map.containsKey(key)) {
221            return key;
222        }
223        // search the next free id using binary search
224        synchronized (this) {
225            long low = key, high = Long.MAX_VALUE;
226            while (low < high) {
227                long x = (low + high) >>> 1;
228                if (map.containsKey(x)) {
229                    low = x + 1;
230                } else {
231                    high = x;
232                }
233            }
234            key = low;
235            nextKey.set(key + 1);
236            return key;
237        }
238    }
239 
240    /**
241     * Get the key of the biggest block, of -1 for inline data.
242     * This method is used to garbage collect orphaned blocks.
243     *
244     * @param id the id
245     * @return the key, or -1
246     */
247    public long getMaxBlockKey(byte[] id) {
248        long maxKey = -1;
249        ByteBuffer idBuffer = ByteBuffer.wrap(id);
250        while (idBuffer.hasRemaining()) {
251            switch (idBuffer.get()) {
252            case 0:
253                // in-place: 0, len (int), data
254                int len = DataUtils.readVarInt(idBuffer);
255                idBuffer.position(idBuffer.position() + len);
256                break;
257            case 1:
258                // block: 1, len (int), blockId (long)
259                DataUtils.readVarInt(idBuffer);
260                long k = DataUtils.readVarLong(idBuffer);
261                maxKey = Math.max(maxKey, k);
262                break;
263            case 2:
264                // indirect: 2, total len (long), blockId (long)
265                DataUtils.readVarLong(idBuffer);
266                long k2 = DataUtils.readVarLong(idBuffer);
267                // recurse
268                byte[] r = map.get(k2);
269                maxKey = Math.max(maxKey, getMaxBlockKey(r));
270                break;
271            default:
272                throw DataUtils.newIllegalArgumentException(
273                        "Unsupported id {0}", Arrays.toString(id));
274            }
275        }
276        return maxKey;
277    }
278 
279    /**
280     * Remove all stored blocks for the given id.
281     *
282     * @param id the id
283     */
284    public void remove(byte[] id) {
285        ByteBuffer idBuffer = ByteBuffer.wrap(id);
286        while (idBuffer.hasRemaining()) {
287            switch (idBuffer.get()) {
288            case 0:
289                // in-place: 0, len (int), data
290                int len = DataUtils.readVarInt(idBuffer);
291                idBuffer.position(idBuffer.position() + len);
292                break;
293            case 1:
294                // block: 1, len (int), blockId (long)
295                DataUtils.readVarInt(idBuffer);
296                long k = DataUtils.readVarLong(idBuffer);
297                map.remove(k);
298                break;
299            case 2:
300                // indirect: 2, total len (long), blockId (long)
301                DataUtils.readVarLong(idBuffer);
302                long k2 = DataUtils.readVarLong(idBuffer);
303                // recurse
304                remove(map.get(k2));
305                map.remove(k2);
306                break;
307            default:
308                throw DataUtils.newIllegalArgumentException(
309                        "Unsupported id {0}", Arrays.toString(id));
310            }
311        }
312    }
313 
314    /**
315     * Calculate the number of data bytes for the given id. As the length is
316     * encoded in the id, this operation does not cause any reads in the map.
317     *
318     * @param id the id
319     * @return the length
320     */
321    public long length(byte[] id) {
322        ByteBuffer idBuffer = ByteBuffer.wrap(id);
323        long length = 0;
324        while (idBuffer.hasRemaining()) {
325            switch (idBuffer.get()) {
326            case 0:
327                // in-place: 0, len (int), data
328                int len = DataUtils.readVarInt(idBuffer);
329                idBuffer.position(idBuffer.position() + len);
330                length += len;
331                break;
332            case 1:
333                // block: 1, len (int), blockId (long)
334                length += DataUtils.readVarInt(idBuffer);
335                DataUtils.readVarLong(idBuffer);
336                break;
337            case 2:
338                // indirect: 2, total len (long), blockId (long)
339                length += DataUtils.readVarLong(idBuffer);
340                DataUtils.readVarLong(idBuffer);
341                break;
342            default:
343                throw DataUtils.newIllegalArgumentException(
344                        "Unsupported id {0}", Arrays.toString(id));
345            }
346        }
347        return length;
348    }
349 
350    /**
351     * Check whether the id itself contains all the data. This operation does
352     * not cause any reads in the map.
353     *
354     * @param id the id
355     * @return if the id contains the data
356     */
357    public boolean isInPlace(byte[] id) {
358        ByteBuffer idBuffer = ByteBuffer.wrap(id);
359        while (idBuffer.hasRemaining()) {
360            if (idBuffer.get() != 0) {
361                return false;
362            }
363            int len = DataUtils.readVarInt(idBuffer);
364            idBuffer.position(idBuffer.position() + len);
365        }
366        return true;
367    }
368 
369    /**
370     * Open an input stream to read data.
371     *
372     * @param id the id
373     * @return the stream
374     */
375    public InputStream get(byte[] id) {
376        return new Stream(this, id);
377    }
378 
379    /**
380     * Get the block.
381     *
382     * @param key the key
383     * @return the block
384     */
385    byte[] getBlock(long key) {
386        byte[] data = map.get(key);
387        if (data == null) {
388            throw DataUtils.newIllegalStateException(
389                    DataUtils.ERROR_BLOCK_NOT_FOUND,
390                    "Block {0} not found",  key);
391        }
392        return data;
393    }
394 
395    /**
396     * A stream backed by a map.
397     */
398    static class Stream extends InputStream {
399 
400        private final StreamStore store;
401        private byte[] oneByteBuffer;
402        private ByteBuffer idBuffer;
403        private ByteArrayInputStream buffer;
404        private long skip;
405        private final long length;
406        private long pos;
407 
408        Stream(StreamStore store, byte[] id) {
409            this.store = store;
410            this.length = store.length(id);
411            this.idBuffer = ByteBuffer.wrap(id);
412        }
413 
414        @Override
415        public int read() throws IOException {
416            byte[] buffer = oneByteBuffer;
417            if (buffer == null) {
418                buffer = oneByteBuffer = new byte[1];
419            }
420            int len = read(buffer, 0, 1);
421            return len == -1 ? -1 : (buffer[0] & 255);
422        }
423 
424        @Override
425        public long skip(long n) {
426            n = Math.min(length - pos, n);
427            if (n == 0) {
428                return 0;
429            }
430            if (buffer != null) {
431                long s = buffer.skip(n);
432                if (s > 0) {
433                    n = s;
434                } else {
435                    buffer = null;
436                    skip += n;
437                }
438            } else {
439                skip += n;
440            }
441            pos += n;
442            return n;
443        }
444 
445        @Override
446        public void close() {
447            buffer = null;
448            idBuffer.position(idBuffer.limit());
449            pos = length;
450        }
451 
452        @Override
453        public int read(byte[] b, int off, int len) throws IOException {
454            if (len <= 0) {
455                return 0;
456            }
457            while (true) {
458                if (buffer == null) {
459                    try {
460                        buffer = nextBuffer();
461                    } catch (IllegalStateException e) {
462                        String msg = DataUtils.formatMessage(
463                                DataUtils.ERROR_BLOCK_NOT_FOUND,
464                                "Block not found in id {0}",
465                                Arrays.toString(idBuffer.array()));
466                        throw new IOException(msg, e);
467                    }
468                    if (buffer == null) {
469                        return -1;
470                    }
471                }
472                int result = buffer.read(b, off, len);
473                if (result > 0) {
474                    pos += result;
475                    return result;
476                }
477                buffer = null;
478            }
479        }
480 
481        private ByteArrayInputStream nextBuffer() {
482            while (idBuffer.hasRemaining()) {
483                switch (idBuffer.get()) {
484                case 0: {
485                    int len = DataUtils.readVarInt(idBuffer);
486                    if (skip >= len) {
487                        skip -= len;
488                        idBuffer.position(idBuffer.position() + len);
489                        continue;
490                    }
491                    int p = (int) (idBuffer.position() + skip);
492                    int l = (int) (len - skip);
493                    idBuffer.position(p + l);
494                    return new ByteArrayInputStream(idBuffer.array(), p, l);
495                }
496                case 1: {
497                    int len = DataUtils.readVarInt(idBuffer);
498                    long key = DataUtils.readVarLong(idBuffer);
499                    if (skip >= len) {
500                        skip -= len;
501                        continue;
502                    }
503                    byte[] data = store.getBlock(key);
504                    int s = (int) skip;
505                    skip = 0;
506                    return new ByteArrayInputStream(data, s, data.length - s);
507                }
508                case 2: {
509                    long len = DataUtils.readVarLong(idBuffer);
510                    long key = DataUtils.readVarLong(idBuffer);
511                    if (skip >= len) {
512                        skip -= len;
513                        continue;
514                    }
515                    byte[] k = store.getBlock(key);
516                    ByteBuffer newBuffer = ByteBuffer.allocate(k.length
517                            + idBuffer.limit() - idBuffer.position());
518                    newBuffer.put(k);
519                    newBuffer.put(idBuffer);
520                    newBuffer.flip();
521                    idBuffer = newBuffer;
522                    return nextBuffer();
523                }
524                default:
525                    throw DataUtils.newIllegalArgumentException(
526                            "Unsupported id {0}",
527                            Arrays.toString(idBuffer.array()));
528                }
529            }
530            return null;
531        }
532 
533    }
534 
535}

[all classes][org.h2.mvstore]
EMMA 2.0.5312 (C) Vladimir Roubtsov