1 | /* |
2 | * Copyright 2004-2014 H2 Group. Multiple-Licensed under the MPL 2.0, |
3 | * and the EPL 1.0 (http://h2database.com/html/license.html). |
4 | * Initial Developer: H2 Group |
5 | */ |
6 | package org.h2.mvstore.db; |
7 | |
8 | import java.nio.ByteBuffer; |
9 | import java.util.ArrayList; |
10 | import java.util.BitSet; |
11 | import java.util.HashMap; |
12 | import java.util.Iterator; |
13 | import java.util.List; |
14 | import java.util.Map.Entry; |
15 | |
16 | import org.h2.mvstore.Cursor; |
17 | import org.h2.mvstore.DataUtils; |
18 | import org.h2.mvstore.MVMap; |
19 | import org.h2.mvstore.MVStore; |
20 | import org.h2.mvstore.WriteBuffer; |
21 | import org.h2.mvstore.type.DataType; |
22 | import org.h2.mvstore.type.ObjectDataType; |
23 | import org.h2.util.New; |
24 | |
25 | /** |
26 | * A store that supports concurrent MVCC read-committed transactions. |
27 | */ |
28 | public class TransactionStore { |
29 | |
30 | /** |
31 | * The store. |
32 | */ |
33 | final MVStore store; |
34 | |
35 | /** |
36 | * The persisted map of prepared transactions. |
37 | * Key: transactionId, value: [ status, name ]. |
38 | */ |
39 | final MVMap<Integer, Object[]> preparedTransactions; |
40 | |
41 | /** |
42 | * The undo log. |
43 | * <p> |
44 | * If the first entry for a transaction doesn't have a logId |
45 | * of 0, then the transaction is partially committed (which means rollback |
46 | * is not possible). Log entries are written before the data is changed |
47 | * (write-ahead). |
48 | * <p> |
49 | * Key: [ opId ], value: [ mapId, key, oldValue ]. |
50 | */ |
51 | final MVMap<Long, Object[]> undoLog; |
52 | |
53 | /** |
54 | * The map of maps. |
55 | */ |
56 | private HashMap<Integer, MVMap<Object, VersionedValue>> maps = |
57 | New.hashMap(); |
58 | |
59 | private final DataType dataType; |
60 | |
61 | private final BitSet openTransactions = new BitSet(); |
62 | |
63 | private boolean init; |
64 | |
65 | private int maxTransactionId = 0xffff; |
66 | |
67 | /** |
68 | * The next id of a temporary map. |
69 | */ |
70 | private int nextTempMapId; |
71 | |
72 | /** |
73 | * Create a new transaction store. |
74 | * |
75 | * @param store the store |
76 | */ |
77 | public TransactionStore(MVStore store) { |
78 | this(store, new ObjectDataType()); |
79 | } |
80 | |
81 | /** |
82 | * Create a new transaction store. |
83 | * |
84 | * @param store the store |
85 | * @param dataType the data type for map keys and values |
86 | */ |
87 | public TransactionStore(MVStore store, DataType dataType) { |
88 | this.store = store; |
89 | this.dataType = dataType; |
90 | preparedTransactions = store.openMap("openTransactions", |
91 | new MVMap.Builder<Integer, Object[]>()); |
92 | VersionedValueType oldValueType = new VersionedValueType(dataType); |
93 | ArrayType undoLogValueType = new ArrayType(new DataType[]{ |
94 | new ObjectDataType(), dataType, oldValueType |
95 | }); |
96 | MVMap.Builder<Long, Object[]> builder = |
97 | new MVMap.Builder<Long, Object[]>(). |
98 | valueType(undoLogValueType); |
99 | undoLog = store.openMap("undoLog", builder); |
100 | if (undoLog.getValueType() != undoLogValueType) { |
101 | throw DataUtils.newIllegalStateException( |
102 | DataUtils.ERROR_TRANSACTION_CORRUPT, |
103 | "Undo map open with a different value type"); |
104 | } |
105 | } |
106 | |
107 | /** |
108 | * Initialize the store. This is needed before a transaction can be opened. |
109 | * If the transaction store is corrupt, this method can throw an exception, |
110 | * in which case the store can only be used for reading. |
111 | */ |
112 | public synchronized void init() { |
113 | init = true; |
114 | // remove all temporary maps |
115 | for (String mapName : store.getMapNames()) { |
116 | if (mapName.startsWith("temp.")) { |
117 | MVMap<Object, Integer> temp = openTempMap(mapName); |
118 | store.removeMap(temp); |
119 | } |
120 | } |
121 | synchronized (undoLog) { |
122 | if (undoLog.size() > 0) { |
123 | for (Long key : undoLog.keySet()) { |
124 | int transactionId = getTransactionId(key); |
125 | openTransactions.set(transactionId); |
126 | } |
127 | } |
128 | } |
129 | } |
130 | |
131 | /** |
132 | * Set the maximum transaction id, after which ids are re-used. If the old |
133 | * transaction is still in use when re-using an old id, the new transaction |
134 | * fails. |
135 | * |
136 | * @param max the maximum id |
137 | */ |
138 | public void setMaxTransactionId(int max) { |
139 | this.maxTransactionId = max; |
140 | } |
141 | |
142 | /** |
143 | * Combine the transaction id and the log id to an operation id. |
144 | * |
145 | * @param transactionId the transaction id |
146 | * @param logId the log id |
147 | * @return the operation id |
148 | */ |
149 | static long getOperationId(int transactionId, long logId) { |
150 | DataUtils.checkArgument(transactionId >= 0 && transactionId < (1 << 24), |
151 | "Transaction id out of range: {0}", transactionId); |
152 | DataUtils.checkArgument(logId >= 0 && logId < (1L << 40), |
153 | "Transaction log id out of range: {0}", logId); |
154 | return ((long) transactionId << 40) | logId; |
155 | } |
156 | |
157 | /** |
158 | * Get the transaction id for the given operation id. |
159 | * |
160 | * @param operationId the operation id |
161 | * @return the transaction id |
162 | */ |
163 | static int getTransactionId(long operationId) { |
164 | return (int) (operationId >>> 40); |
165 | } |
166 | |
167 | /** |
168 | * Get the log id for the given operation id. |
169 | * |
170 | * @param operationId the operation id |
171 | * @return the log id |
172 | */ |
173 | static long getLogId(long operationId) { |
174 | return operationId & ((1L << 40) - 1); |
175 | } |
176 | |
177 | /** |
178 | * Get the list of unclosed transactions that have pending writes. |
179 | * |
180 | * @return the list of transactions (sorted by id) |
181 | */ |
182 | public List<Transaction> getOpenTransactions() { |
183 | synchronized (undoLog) { |
184 | ArrayList<Transaction> list = New.arrayList(); |
185 | Long key = undoLog.firstKey(); |
186 | while (key != null) { |
187 | int transactionId = getTransactionId(key); |
188 | key = undoLog.lowerKey(getOperationId(transactionId + 1, 0)); |
189 | long logId = getLogId(key) + 1; |
190 | Object[] data = preparedTransactions.get(transactionId); |
191 | int status; |
192 | String name; |
193 | if (data == null) { |
194 | if (undoLog.containsKey(getOperationId(transactionId, 0))) { |
195 | status = Transaction.STATUS_OPEN; |
196 | } else { |
197 | status = Transaction.STATUS_COMMITTING; |
198 | } |
199 | name = null; |
200 | } else { |
201 | status = (Integer) data[0]; |
202 | name = (String) data[1]; |
203 | } |
204 | Transaction t = new Transaction(this, transactionId, status, |
205 | name, logId); |
206 | list.add(t); |
207 | key = undoLog.ceilingKey(getOperationId(transactionId + 1, 0)); |
208 | } |
209 | return list; |
210 | } |
211 | } |
212 | |
213 | /** |
214 | * Close the transaction store. |
215 | */ |
216 | public synchronized void close() { |
217 | store.commit(); |
218 | } |
219 | |
220 | /** |
221 | * Begin a new transaction. |
222 | * |
223 | * @return the transaction |
224 | */ |
225 | public synchronized Transaction begin() { |
226 | if (!init) { |
227 | throw DataUtils.newIllegalStateException( |
228 | DataUtils.ERROR_TRANSACTION_ILLEGAL_STATE, |
229 | "Not initialized"); |
230 | } |
231 | int transactionId = openTransactions.nextClearBit(1); |
232 | if (transactionId > maxTransactionId) { |
233 | throw DataUtils.newIllegalStateException( |
234 | DataUtils.ERROR_TOO_MANY_OPEN_TRANSACTIONS, |
235 | "There are {0} open transactions", |
236 | transactionId - 1); |
237 | } |
238 | openTransactions.set(transactionId); |
239 | int status = Transaction.STATUS_OPEN; |
240 | return new Transaction(this, transactionId, status, null, 0); |
241 | } |
242 | |
243 | /** |
244 | * Store a transaction. |
245 | * |
246 | * @param t the transaction |
247 | */ |
248 | synchronized void storeTransaction(Transaction t) { |
249 | if (t.getStatus() == Transaction.STATUS_PREPARED || |
250 | t.getName() != null) { |
251 | Object[] v = { t.getStatus(), t.getName() }; |
252 | preparedTransactions.put(t.getId(), v); |
253 | } |
254 | } |
255 | |
256 | /** |
257 | * Log an entry. |
258 | * |
259 | * @param t the transaction |
260 | * @param logId the log id |
261 | * @param mapId the map id |
262 | * @param key the key |
263 | * @param oldValue the old value |
264 | */ |
265 | void log(Transaction t, long logId, int mapId, |
266 | Object key, Object oldValue) { |
267 | Long undoKey = getOperationId(t.getId(), logId); |
268 | Object[] log = new Object[] { mapId, key, oldValue }; |
269 | synchronized (undoLog) { |
270 | if (logId == 0) { |
271 | if (undoLog.containsKey(undoKey)) { |
272 | throw DataUtils.newIllegalStateException( |
273 | DataUtils.ERROR_TOO_MANY_OPEN_TRANSACTIONS, |
274 | "An old transaction with the same id " + |
275 | "is still open: {0}", |
276 | t.getId()); |
277 | } |
278 | } |
279 | undoLog.put(undoKey, log); |
280 | } |
281 | } |
282 | |
283 | /** |
284 | * Remove a log entry. |
285 | * |
286 | * @param t the transaction |
287 | * @param logId the log id |
288 | */ |
289 | public void logUndo(Transaction t, long logId) { |
290 | Long undoKey = getOperationId(t.getId(), logId); |
291 | synchronized (undoLog) { |
292 | Object[] old = undoLog.remove(undoKey); |
293 | if (old == null) { |
294 | throw DataUtils.newIllegalStateException( |
295 | DataUtils.ERROR_TRANSACTION_ILLEGAL_STATE, |
296 | "Transaction {0} was concurrently rolled back", |
297 | t.getId()); |
298 | } |
299 | } |
300 | } |
301 | |
302 | /** |
303 | * Remove the given map. |
304 | * |
305 | * @param <K> the key type |
306 | * @param <V> the value type |
307 | * @param map the map |
308 | */ |
309 | synchronized <K, V> void removeMap(TransactionMap<K, V> map) { |
310 | maps.remove(map.mapId); |
311 | store.removeMap(map.map); |
312 | } |
313 | |
314 | /** |
315 | * Commit a transaction. |
316 | * |
317 | * @param t the transaction |
318 | * @param maxLogId the last log id |
319 | */ |
320 | void commit(Transaction t, long maxLogId) { |
321 | if (store.isClosed()) { |
322 | return; |
323 | } |
324 | // TODO could synchronize on blocks (100 at a time or so) |
325 | synchronized (undoLog) { |
326 | t.setStatus(Transaction.STATUS_COMMITTING); |
327 | for (long logId = 0; logId < maxLogId; logId++) { |
328 | Long undoKey = getOperationId(t.getId(), logId); |
329 | Object[] op = undoLog.get(undoKey); |
330 | if (op == null) { |
331 | // partially committed: load next |
332 | undoKey = undoLog.ceilingKey(undoKey); |
333 | if (undoKey == null || |
334 | getTransactionId(undoKey) != t.getId()) { |
335 | break; |
336 | } |
337 | logId = getLogId(undoKey) - 1; |
338 | continue; |
339 | } |
340 | int mapId = (Integer) op[0]; |
341 | MVMap<Object, VersionedValue> map = openMap(mapId); |
342 | if (map == null) { |
343 | // map was later removed |
344 | } else { |
345 | Object key = op[1]; |
346 | VersionedValue value = map.get(key); |
347 | if (value == null) { |
348 | // nothing to do |
349 | } else if (value.value == null) { |
350 | // remove the value |
351 | map.remove(key); |
352 | } else { |
353 | VersionedValue v2 = new VersionedValue(); |
354 | v2.value = value.value; |
355 | map.put(key, v2); |
356 | } |
357 | } |
358 | undoLog.remove(undoKey); |
359 | } |
360 | } |
361 | endTransaction(t); |
362 | } |
363 | |
364 | /** |
365 | * Open the map with the given name. |
366 | * |
367 | * @param <K> the key type |
368 | * @param name the map name |
369 | * @param keyType the key type |
370 | * @param valueType the value type |
371 | * @return the map |
372 | */ |
373 | synchronized <K> MVMap<K, VersionedValue> openMap(String name, |
374 | DataType keyType, DataType valueType) { |
375 | if (keyType == null) { |
376 | keyType = new ObjectDataType(); |
377 | } |
378 | if (valueType == null) { |
379 | valueType = new ObjectDataType(); |
380 | } |
381 | VersionedValueType vt = new VersionedValueType(valueType); |
382 | MVMap<K, VersionedValue> map; |
383 | MVMap.Builder<K, VersionedValue> builder = |
384 | new MVMap.Builder<K, VersionedValue>(). |
385 | keyType(keyType).valueType(vt); |
386 | map = store.openMap(name, builder); |
387 | @SuppressWarnings("unchecked") |
388 | MVMap<Object, VersionedValue> m = (MVMap<Object, VersionedValue>) map; |
389 | maps.put(map.getId(), m); |
390 | return map; |
391 | } |
392 | |
393 | /** |
394 | * Open the map with the given id. |
395 | * |
396 | * @param mapId the id |
397 | * @return the map |
398 | */ |
399 | synchronized MVMap<Object, VersionedValue> openMap(int mapId) { |
400 | MVMap<Object, VersionedValue> map = maps.get(mapId); |
401 | if (map != null) { |
402 | return map; |
403 | } |
404 | String mapName = store.getMapName(mapId); |
405 | if (mapName == null) { |
406 | // the map was removed later on |
407 | return null; |
408 | } |
409 | VersionedValueType vt = new VersionedValueType(dataType); |
410 | MVMap.Builder<Object, VersionedValue> mapBuilder = |
411 | new MVMap.Builder<Object, VersionedValue>(). |
412 | keyType(dataType).valueType(vt); |
413 | map = store.openMap(mapName, mapBuilder); |
414 | maps.put(mapId, map); |
415 | return map; |
416 | } |
417 | |
418 | /** |
419 | * Create a temporary map. Such maps are removed when opening the store. |
420 | * |
421 | * @return the map |
422 | */ |
423 | synchronized MVMap<Object, Integer> createTempMap() { |
424 | String mapName = "temp." + nextTempMapId++; |
425 | return openTempMap(mapName); |
426 | } |
427 | |
428 | /** |
429 | * Open a temporary map. |
430 | * |
431 | * @param mapName the map name |
432 | * @return the map |
433 | */ |
434 | MVMap<Object, Integer> openTempMap(String mapName) { |
435 | MVMap.Builder<Object, Integer> mapBuilder = |
436 | new MVMap.Builder<Object, Integer>(). |
437 | keyType(dataType); |
438 | return store.openMap(mapName, mapBuilder); |
439 | } |
440 | |
441 | /** |
442 | * End this transaction |
443 | * |
444 | * @param t the transaction |
445 | */ |
446 | synchronized void endTransaction(Transaction t) { |
447 | if (t.getStatus() == Transaction.STATUS_PREPARED) { |
448 | preparedTransactions.remove(t.getId()); |
449 | } |
450 | t.setStatus(Transaction.STATUS_CLOSED); |
451 | openTransactions.clear(t.transactionId); |
452 | if (store.getAutoCommitDelay() == 0) { |
453 | store.commit(); |
454 | return; |
455 | } |
456 | // to avoid having to store the transaction log, |
457 | // if there is no open transaction, |
458 | // and if there have been many changes, store them now |
459 | if (undoLog.isEmpty()) { |
460 | int unsaved = store.getUnsavedMemory(); |
461 | int max = store.getAutoCommitMemory(); |
462 | // save at 3/4 capacity |
463 | if (unsaved * 4 > max * 3) { |
464 | store.commit(); |
465 | } |
466 | } |
467 | } |
468 | |
469 | /** |
470 | * Rollback to an old savepoint. |
471 | * |
472 | * @param t the transaction |
473 | * @param maxLogId the last log id |
474 | * @param toLogId the log id to roll back to |
475 | */ |
476 | void rollbackTo(Transaction t, long maxLogId, long toLogId) { |
477 | // TODO could synchronize on blocks (100 at a time or so) |
478 | synchronized (undoLog) { |
479 | for (long logId = maxLogId - 1; logId >= toLogId; logId--) { |
480 | Long undoKey = getOperationId(t.getId(), logId); |
481 | Object[] op = undoLog.get(undoKey); |
482 | if (op == null) { |
483 | // partially rolled back: load previous |
484 | undoKey = undoLog.floorKey(undoKey); |
485 | if (undoKey == null || |
486 | getTransactionId(undoKey) != t.getId()) { |
487 | break; |
488 | } |
489 | logId = getLogId(undoKey) + 1; |
490 | continue; |
491 | } |
492 | int mapId = ((Integer) op[0]).intValue(); |
493 | MVMap<Object, VersionedValue> map = openMap(mapId); |
494 | if (map != null) { |
495 | Object key = op[1]; |
496 | VersionedValue oldValue = (VersionedValue) op[2]; |
497 | if (oldValue == null) { |
498 | // this transaction added the value |
499 | map.remove(key); |
500 | } else { |
501 | // this transaction updated the value |
502 | map.put(key, oldValue); |
503 | } |
504 | } |
505 | undoLog.remove(undoKey); |
506 | } |
507 | } |
508 | } |
509 | |
510 | /** |
511 | * Get the changes of the given transaction, starting from the latest log id |
512 | * back to the given log id. |
513 | * |
514 | * @param t the transaction |
515 | * @param maxLogId the maximum log id |
516 | * @param toLogId the minimum log id |
517 | * @return the changes |
518 | */ |
519 | Iterator<Change> getChanges(final Transaction t, final long maxLogId, |
520 | final long toLogId) { |
521 | return new Iterator<Change>() { |
522 | |
523 | private long logId = maxLogId - 1; |
524 | private Change current; |
525 | |
526 | { |
527 | fetchNext(); |
528 | } |
529 | |
530 | private void fetchNext() { |
531 | synchronized (undoLog) { |
532 | while (logId >= toLogId) { |
533 | Long undoKey = getOperationId(t.getId(), logId); |
534 | Object[] op = undoLog.get(undoKey); |
535 | logId--; |
536 | if (op == null) { |
537 | // partially rolled back: load previous |
538 | undoKey = undoLog.floorKey(undoKey); |
539 | if (undoKey == null || |
540 | getTransactionId(undoKey) != t.getId()) { |
541 | break; |
542 | } |
543 | logId = getLogId(undoKey); |
544 | continue; |
545 | } |
546 | int mapId = ((Integer) op[0]).intValue(); |
547 | MVMap<Object, VersionedValue> m = openMap(mapId); |
548 | if (m == null) { |
549 | // map was removed later on |
550 | } else { |
551 | current = new Change(); |
552 | current.mapName = m.getName(); |
553 | current.key = op[1]; |
554 | VersionedValue oldValue = (VersionedValue) op[2]; |
555 | current.value = oldValue == null ? |
556 | null : oldValue.value; |
557 | return; |
558 | } |
559 | } |
560 | } |
561 | current = null; |
562 | } |
563 | |
564 | @Override |
565 | public boolean hasNext() { |
566 | return current != null; |
567 | } |
568 | |
569 | @Override |
570 | public Change next() { |
571 | if (current == null) { |
572 | throw DataUtils.newUnsupportedOperationException("no data"); |
573 | } |
574 | Change result = current; |
575 | fetchNext(); |
576 | return result; |
577 | } |
578 | |
579 | @Override |
580 | public void remove() { |
581 | throw DataUtils.newUnsupportedOperationException("remove"); |
582 | } |
583 | |
584 | }; |
585 | } |
586 | |
587 | /** |
588 | * A change in a map. |
589 | */ |
590 | public static class Change { |
591 | |
592 | /** |
593 | * The name of the map where the change occurred. |
594 | */ |
595 | public String mapName; |
596 | |
597 | /** |
598 | * The key. |
599 | */ |
600 | public Object key; |
601 | |
602 | /** |
603 | * The value. |
604 | */ |
605 | public Object value; |
606 | } |
607 | |
608 | /** |
609 | * A transaction. |
610 | */ |
611 | public static class Transaction { |
612 | |
613 | /** |
614 | * The status of a closed transaction (committed or rolled back). |
615 | */ |
616 | public static final int STATUS_CLOSED = 0; |
617 | |
618 | /** |
619 | * The status of an open transaction. |
620 | */ |
621 | public static final int STATUS_OPEN = 1; |
622 | |
623 | /** |
624 | * The status of a prepared transaction. |
625 | */ |
626 | public static final int STATUS_PREPARED = 2; |
627 | |
628 | /** |
629 | * The status of a transaction that is being committed, but possibly not |
630 | * yet finished. A transactions can go into this state when the store is |
631 | * closed while the transaction is committing. When opening a store, |
632 | * such transactions should be committed. |
633 | */ |
634 | public static final int STATUS_COMMITTING = 3; |
635 | |
636 | /** |
637 | * The transaction store. |
638 | */ |
639 | final TransactionStore store; |
640 | |
641 | /** |
642 | * The transaction id. |
643 | */ |
644 | final int transactionId; |
645 | |
646 | /** |
647 | * The log id of the last entry in the undo log map. |
648 | */ |
649 | long logId; |
650 | |
651 | private int status; |
652 | |
653 | private String name; |
654 | |
655 | Transaction(TransactionStore store, int transactionId, int status, |
656 | String name, long logId) { |
657 | this.store = store; |
658 | this.transactionId = transactionId; |
659 | this.status = status; |
660 | this.name = name; |
661 | this.logId = logId; |
662 | } |
663 | |
664 | public int getId() { |
665 | return transactionId; |
666 | } |
667 | |
668 | public int getStatus() { |
669 | return status; |
670 | } |
671 | |
672 | void setStatus(int status) { |
673 | this.status = status; |
674 | } |
675 | |
676 | public void setName(String name) { |
677 | checkNotClosed(); |
678 | this.name = name; |
679 | store.storeTransaction(this); |
680 | } |
681 | |
682 | public String getName() { |
683 | return name; |
684 | } |
685 | |
686 | /** |
687 | * Create a new savepoint. |
688 | * |
689 | * @return the savepoint id |
690 | */ |
691 | public long setSavepoint() { |
692 | return logId; |
693 | } |
694 | |
695 | /** |
696 | * Add a log entry. |
697 | * |
698 | * @param mapId the map id |
699 | * @param key the key |
700 | * @param oldValue the old value |
701 | */ |
702 | void log(int mapId, Object key, Object oldValue) { |
703 | store.log(this, logId, mapId, key, oldValue); |
704 | // only increment the log id if logging was successful |
705 | logId++; |
706 | } |
707 | |
708 | /** |
709 | * Remove the last log entry. |
710 | */ |
711 | void logUndo() { |
712 | store.logUndo(this, --logId); |
713 | } |
714 | |
715 | /** |
716 | * Open a data map. |
717 | * |
718 | * @param <K> the key type |
719 | * @param <V> the value type |
720 | * @param name the name of the map |
721 | * @return the transaction map |
722 | */ |
723 | public <K, V> TransactionMap<K, V> openMap(String name) { |
724 | return openMap(name, null, null); |
725 | } |
726 | |
727 | /** |
728 | * Open the map to store the data. |
729 | * |
730 | * @param <K> the key type |
731 | * @param <V> the value type |
732 | * @param name the name of the map |
733 | * @param keyType the key data type |
734 | * @param valueType the value data type |
735 | * @return the transaction map |
736 | */ |
737 | public <K, V> TransactionMap<K, V> openMap(String name, |
738 | DataType keyType, DataType valueType) { |
739 | checkNotClosed(); |
740 | MVMap<K, VersionedValue> map = store.openMap(name, keyType, |
741 | valueType); |
742 | int mapId = map.getId(); |
743 | return new TransactionMap<K, V>(this, map, mapId); |
744 | } |
745 | |
746 | /** |
747 | * Open the transactional version of the given map. |
748 | * |
749 | * @param <K> the key type |
750 | * @param <V> the value type |
751 | * @param map the base map |
752 | * @return the transactional map |
753 | */ |
754 | public <K, V> TransactionMap<K, V> openMap( |
755 | MVMap<K, VersionedValue> map) { |
756 | checkNotClosed(); |
757 | int mapId = map.getId(); |
758 | return new TransactionMap<K, V>(this, map, mapId); |
759 | } |
760 | |
761 | /** |
762 | * Prepare the transaction. Afterwards, the transaction can only be |
763 | * committed or rolled back. |
764 | */ |
765 | public void prepare() { |
766 | checkNotClosed(); |
767 | status = STATUS_PREPARED; |
768 | store.storeTransaction(this); |
769 | } |
770 | |
771 | /** |
772 | * Commit the transaction. Afterwards, this transaction is closed. |
773 | */ |
774 | public void commit() { |
775 | checkNotClosed(); |
776 | store.commit(this, logId); |
777 | } |
778 | |
779 | /** |
780 | * Roll back to the given savepoint. This is only allowed if the |
781 | * transaction is open. |
782 | * |
783 | * @param savepointId the savepoint id |
784 | */ |
785 | public void rollbackToSavepoint(long savepointId) { |
786 | checkNotClosed(); |
787 | store.rollbackTo(this, logId, savepointId); |
788 | logId = savepointId; |
789 | } |
790 | |
791 | /** |
792 | * Roll the transaction back. Afterwards, this transaction is closed. |
793 | */ |
794 | public void rollback() { |
795 | checkNotClosed(); |
796 | store.rollbackTo(this, logId, 0); |
797 | store.endTransaction(this); |
798 | } |
799 | |
800 | /** |
801 | * Get the list of changes, starting with the latest change, up to the |
802 | * given savepoint (in reverse order than they occurred). The value of |
803 | * the change is the value before the change was applied. |
804 | * |
805 | * @param savepointId the savepoint id, 0 meaning the beginning of the |
806 | * transaction |
807 | * @return the changes |
808 | */ |
809 | public Iterator<Change> getChanges(long savepointId) { |
810 | return store.getChanges(this, logId, savepointId); |
811 | } |
812 | |
813 | /** |
814 | * Check whether this transaction is open or prepared. |
815 | */ |
816 | void checkNotClosed() { |
817 | if (status == STATUS_CLOSED) { |
818 | throw DataUtils.newIllegalStateException( |
819 | DataUtils.ERROR_CLOSED, "Transaction is closed"); |
820 | } |
821 | } |
822 | |
823 | /** |
824 | * Remove the map. |
825 | * |
826 | * @param map the map |
827 | */ |
828 | public <K, V> void removeMap(TransactionMap<K, V> map) { |
829 | store.removeMap(map); |
830 | } |
831 | |
832 | @Override |
833 | public String toString() { |
834 | return "" + transactionId; |
835 | } |
836 | |
837 | } |
838 | |
839 | /** |
840 | * A map that supports transactions. |
841 | * |
842 | * @param <K> the key type |
843 | * @param <V> the value type |
844 | */ |
845 | public static class TransactionMap<K, V> { |
846 | |
847 | /** |
848 | * The map id. |
849 | */ |
850 | final int mapId; |
851 | |
852 | /** |
853 | * If a record was read that was updated by this transaction, and the |
854 | * update occurred before this log id, the older version is read. This |
855 | * is so that changes are not immediately visible, to support statement |
856 | * processing (for example "update test set id = id + 1"). |
857 | */ |
858 | long readLogId = Long.MAX_VALUE; |
859 | |
860 | /** |
861 | * The map used for writing (the latest version). |
862 | * <p> |
863 | * Key: key the key of the data. |
864 | * Value: { transactionId, oldVersion, value } |
865 | */ |
866 | final MVMap<K, VersionedValue> map; |
867 | |
868 | private Transaction transaction; |
869 | |
870 | TransactionMap(Transaction transaction, MVMap<K, VersionedValue> map, |
871 | int mapId) { |
872 | this.transaction = transaction; |
873 | this.map = map; |
874 | this.mapId = mapId; |
875 | } |
876 | |
877 | /** |
878 | * Set the savepoint. Afterwards, reads are based on the specified |
879 | * savepoint. |
880 | * |
881 | * @param savepoint the savepoint |
882 | */ |
883 | public void setSavepoint(long savepoint) { |
884 | this.readLogId = savepoint; |
885 | } |
886 | |
887 | /** |
888 | * Get a clone of this map for the given transaction. |
889 | * |
890 | * @param transaction the transaction |
891 | * @param savepoint the savepoint |
892 | * @return the map |
893 | */ |
894 | public TransactionMap<K, V> getInstance(Transaction transaction, |
895 | long savepoint) { |
896 | TransactionMap<K, V> m = |
897 | new TransactionMap<K, V>(transaction, map, mapId); |
898 | m.setSavepoint(savepoint); |
899 | return m; |
900 | } |
901 | |
902 | /** |
903 | * Get the size of the raw map. This includes uncommitted entries, and |
904 | * transiently removed entries, so it is the maximum number of entries. |
905 | * |
906 | * @return the maximum size |
907 | */ |
908 | public long sizeAsLongMax() { |
909 | return map.sizeAsLong(); |
910 | } |
911 | |
912 | /** |
913 | * Get the size of the map as seen by this transaction. |
914 | * |
915 | * @return the size |
916 | */ |
917 | public long sizeAsLong() { |
918 | long sizeRaw = map.sizeAsLong(); |
919 | MVMap<Long, Object[]> undo = transaction.store.undoLog; |
920 | long undoLogSize; |
921 | synchronized (undo) { |
922 | undoLogSize = undo.sizeAsLong(); |
923 | } |
924 | if (undoLogSize == 0) { |
925 | return sizeRaw; |
926 | } |
927 | if (undoLogSize > sizeRaw) { |
928 | // the undo log is larger than the map - |
929 | // count the entries of the map |
930 | long size = 0; |
931 | Cursor<K, VersionedValue> cursor = map.cursor(null); |
932 | while (cursor.hasNext()) { |
933 | K key = cursor.next(); |
934 | VersionedValue data = cursor.getValue(); |
935 | data = getValue(key, readLogId, data); |
936 | if (data != null && data.value != null) { |
937 | size++; |
938 | } |
939 | } |
940 | return size; |
941 | } |
942 | // the undo log is smaller than the map - |
943 | // scan the undo log and subtract invisible entries |
944 | synchronized (undo) { |
945 | // re-fetch in case any transaction was committed now |
946 | long size = map.sizeAsLong(); |
947 | MVMap<Object, Integer> temp = transaction.store.createTempMap(); |
948 | try { |
949 | for (Entry<Long, Object[]> e : undo.entrySet()) { |
950 | Object[] op = e.getValue(); |
951 | int m = (Integer) op[0]; |
952 | if (m != mapId) { |
953 | // a different map - ignore |
954 | continue; |
955 | } |
956 | @SuppressWarnings("unchecked") |
957 | K key = (K) op[1]; |
958 | if (get(key) == null) { |
959 | Integer old = temp.put(key, 1); |
960 | // count each key only once (there might be multiple |
961 | // changes for the same key) |
962 | if (old == null) { |
963 | size--; |
964 | } |
965 | } |
966 | } |
967 | } finally { |
968 | transaction.store.store.removeMap(temp); |
969 | } |
970 | return size; |
971 | } |
972 | } |
973 | |
974 | /** |
975 | * Remove an entry. |
976 | * <p> |
977 | * If the row is locked, this method will retry until the row could be |
978 | * updated or until a lock timeout. |
979 | * |
980 | * @param key the key |
981 | * @throws IllegalStateException if a lock timeout occurs |
982 | */ |
983 | public V remove(K key) { |
984 | return set(key, null); |
985 | } |
986 | |
987 | /** |
988 | * Update the value for the given key. |
989 | * <p> |
990 | * If the row is locked, this method will retry until the row could be |
991 | * updated or until a lock timeout. |
992 | * |
993 | * @param key the key |
994 | * @param value the new value (not null) |
995 | * @return the old value |
996 | * @throws IllegalStateException if a lock timeout occurs |
997 | */ |
998 | public V put(K key, V value) { |
999 | DataUtils.checkArgument(value != null, "The value may not be null"); |
1000 | return set(key, value); |
1001 | } |
1002 | |
1003 | /** |
1004 | * Update the value for the given key, without adding an undo log entry. |
1005 | * |
1006 | * @param key the key |
1007 | * @param value the value |
1008 | * @return the old value |
1009 | */ |
1010 | @SuppressWarnings("unchecked") |
1011 | public V putCommitted(K key, V value) { |
1012 | DataUtils.checkArgument(value != null, "The value may not be null"); |
1013 | VersionedValue newValue = new VersionedValue(); |
1014 | newValue.value = value; |
1015 | VersionedValue oldValue = map.put(key, newValue); |
1016 | return (V) (oldValue == null ? null : oldValue.value); |
1017 | } |
1018 | |
1019 | private V set(K key, V value) { |
1020 | transaction.checkNotClosed(); |
1021 | V old = get(key); |
1022 | boolean ok = trySet(key, value, false); |
1023 | if (ok) { |
1024 | return old; |
1025 | } |
1026 | throw DataUtils.newIllegalStateException( |
1027 | DataUtils.ERROR_TRANSACTION_LOCKED, "Entry is locked"); |
1028 | } |
1029 | |
1030 | /** |
1031 | * Try to remove the value for the given key. |
1032 | * <p> |
1033 | * This will fail if the row is locked by another transaction (that |
1034 | * means, if another open transaction changed the row). |
1035 | * |
1036 | * @param key the key |
1037 | * @return whether the entry could be removed |
1038 | */ |
1039 | public boolean tryRemove(K key) { |
1040 | return trySet(key, null, false); |
1041 | } |
1042 | |
1043 | /** |
1044 | * Try to update the value for the given key. |
1045 | * <p> |
1046 | * This will fail if the row is locked by another transaction (that |
1047 | * means, if another open transaction changed the row). |
1048 | * |
1049 | * @param key the key |
1050 | * @param value the new value |
1051 | * @return whether the entry could be updated |
1052 | */ |
1053 | public boolean tryPut(K key, V value) { |
1054 | DataUtils.checkArgument(value != null, "The value may not be null"); |
1055 | return trySet(key, value, false); |
1056 | } |
1057 | |
1058 | /** |
1059 | * Try to set or remove the value. When updating only unchanged entries, |
1060 | * then the value is only changed if it was not changed after opening |
1061 | * the map. |
1062 | * |
1063 | * @param key the key |
1064 | * @param value the new value (null to remove the value) |
1065 | * @param onlyIfUnchanged only set the value if it was not changed (by |
1066 | * this or another transaction) since the map was opened |
1067 | * @return true if the value was set, false if there was a concurrent |
1068 | * update |
1069 | */ |
1070 | public boolean trySet(K key, V value, boolean onlyIfUnchanged) { |
1071 | VersionedValue current = map.get(key); |
1072 | if (onlyIfUnchanged) { |
1073 | VersionedValue old = getValue(key, readLogId); |
1074 | if (!map.areValuesEqual(old, current)) { |
1075 | long tx = getTransactionId(current.operationId); |
1076 | if (tx == transaction.transactionId) { |
1077 | if (value == null) { |
1078 | // ignore removing an entry |
1079 | // if it was added or changed |
1080 | // in the same statement |
1081 | return true; |
1082 | } else if (current.value == null) { |
1083 | // add an entry that was removed |
1084 | // in the same statement |
1085 | } else { |
1086 | return false; |
1087 | } |
1088 | } else { |
1089 | return false; |
1090 | } |
1091 | } |
1092 | } |
1093 | VersionedValue newValue = new VersionedValue(); |
1094 | newValue.operationId = getOperationId( |
1095 | transaction.transactionId, transaction.logId); |
1096 | newValue.value = value; |
1097 | if (current == null) { |
1098 | // a new value |
1099 | transaction.log(mapId, key, current); |
1100 | VersionedValue old = map.putIfAbsent(key, newValue); |
1101 | if (old != null) { |
1102 | transaction.logUndo(); |
1103 | return false; |
1104 | } |
1105 | return true; |
1106 | } |
1107 | long id = current.operationId; |
1108 | if (id == 0) { |
1109 | // committed |
1110 | transaction.log(mapId, key, current); |
1111 | // the transaction is committed: |
1112 | // overwrite the value |
1113 | if (!map.replace(key, current, newValue)) { |
1114 | // somebody else was faster |
1115 | transaction.logUndo(); |
1116 | return false; |
1117 | } |
1118 | return true; |
1119 | } |
1120 | int tx = getTransactionId(current.operationId); |
1121 | if (tx == transaction.transactionId) { |
1122 | // added or updated by this transaction |
1123 | transaction.log(mapId, key, current); |
1124 | if (!map.replace(key, current, newValue)) { |
1125 | // strange, somebody overwrote the value |
1126 | // even though the change was not committed |
1127 | transaction.logUndo(); |
1128 | return false; |
1129 | } |
1130 | return true; |
1131 | } |
1132 | // the transaction is not yet committed |
1133 | return false; |
1134 | } |
1135 | |
1136 | /** |
1137 | * Get the value for the given key at the time when this map was opened. |
1138 | * |
1139 | * @param key the key |
1140 | * @return the value or null |
1141 | */ |
1142 | public V get(K key) { |
1143 | return get(key, readLogId); |
1144 | } |
1145 | |
1146 | /** |
1147 | * Get the most recent value for the given key. |
1148 | * |
1149 | * @param key the key |
1150 | * @return the value or null |
1151 | */ |
1152 | public V getLatest(K key) { |
1153 | return get(key, Long.MAX_VALUE); |
1154 | } |
1155 | |
1156 | /** |
1157 | * Whether the map contains the key. |
1158 | * |
1159 | * @param key the key |
1160 | * @return true if the map contains an entry for this key |
1161 | */ |
1162 | public boolean containsKey(K key) { |
1163 | return get(key) != null; |
1164 | } |
1165 | |
1166 | /** |
1167 | * Get the value for the given key. |
1168 | * |
1169 | * @param key the key |
1170 | * @param maxLogId the maximum log id |
1171 | * @return the value or null |
1172 | */ |
1173 | @SuppressWarnings("unchecked") |
1174 | public V get(K key, long maxLogId) { |
1175 | VersionedValue data = getValue(key, maxLogId); |
1176 | return data == null ? null : (V) data.value; |
1177 | } |
1178 | |
1179 | /** |
1180 | * Whether the entry for this key was added or removed from this |
1181 | * session. |
1182 | * |
1183 | * @param key the key |
1184 | * @return true if yes |
1185 | */ |
1186 | public boolean isSameTransaction(K key) { |
1187 | VersionedValue data = map.get(key); |
1188 | if (data == null) { |
1189 | // doesn't exist or deleted by a committed transaction |
1190 | return false; |
1191 | } |
1192 | int tx = getTransactionId(data.operationId); |
1193 | return tx == transaction.transactionId; |
1194 | } |
1195 | |
1196 | private VersionedValue getValue(K key, long maxLog) { |
1197 | VersionedValue data = map.get(key); |
1198 | return getValue(key, maxLog, data); |
1199 | } |
1200 | |
1201 | /** |
1202 | * Get the versioned value for the given key. |
1203 | * |
1204 | * @param key the key |
1205 | * @param maxLog the maximum log id of the entry |
1206 | * @param data the value stored in the main map |
1207 | * @return the value |
1208 | */ |
1209 | VersionedValue getValue(K key, long maxLog, VersionedValue data) { |
1210 | while (true) { |
1211 | if (data == null) { |
1212 | // doesn't exist or deleted by a committed transaction |
1213 | return null; |
1214 | } |
1215 | long id = data.operationId; |
1216 | if (id == 0) { |
1217 | // it is committed |
1218 | return data; |
1219 | } |
1220 | int tx = getTransactionId(id); |
1221 | if (tx == transaction.transactionId) { |
1222 | // added by this transaction |
1223 | if (getLogId(id) < maxLog) { |
1224 | return data; |
1225 | } |
1226 | } |
1227 | // get the value before the uncommitted transaction |
1228 | Object[] d; |
1229 | synchronized (transaction.store.undoLog) { |
1230 | d = transaction.store.undoLog.get(id); |
1231 | } |
1232 | if (d == null) { |
1233 | // this entry should be committed or rolled back |
1234 | // in the meantime (the transaction might still be open) |
1235 | data = map.get(key); |
1236 | if (data != null && data.operationId == id) { |
1237 | // the transaction was not committed correctly |
1238 | throw DataUtils.newIllegalStateException( |
1239 | DataUtils.ERROR_TRANSACTION_CORRUPT, |
1240 | "The transaction log might be corrupt for key {0}", |
1241 | key); |
1242 | } |
1243 | } else { |
1244 | data = (VersionedValue) d[2]; |
1245 | } |
1246 | // verify this is either committed, |
1247 | // or the same transaction and earlier |
1248 | if (data != null) { |
1249 | long id2 = data.operationId; |
1250 | if (id2 != 0) { |
1251 | int tx2 = getTransactionId(id2); |
1252 | if (tx2 != tx) { |
1253 | // a different transaction - ok |
1254 | } else if (getLogId(id2) > getLogId(id)) { |
1255 | // newer than before |
1256 | break; |
1257 | } |
1258 | } |
1259 | } |
1260 | } |
1261 | throw DataUtils.newIllegalStateException( |
1262 | DataUtils.ERROR_TRANSACTION_CORRUPT, |
1263 | "The transaction log might be corrupt for key {0}", key); |
1264 | } |
1265 | |
1266 | /** |
1267 | * Check whether this map is closed. |
1268 | * |
1269 | * @return true if closed |
1270 | */ |
1271 | public boolean isClosed() { |
1272 | return map.isClosed(); |
1273 | } |
1274 | |
1275 | /** |
1276 | * Clear the map. |
1277 | */ |
1278 | public void clear() { |
1279 | // TODO truncate transactionally? |
1280 | map.clear(); |
1281 | } |
1282 | |
1283 | /** |
1284 | * Get the first key. |
1285 | * |
1286 | * @return the first key, or null if empty |
1287 | */ |
1288 | public K firstKey() { |
1289 | Iterator<K> it = keyIterator(null); |
1290 | return it.hasNext() ? it.next() : null; |
1291 | } |
1292 | |
1293 | /** |
1294 | * Get the last key. |
1295 | * |
1296 | * @return the last key, or null if empty |
1297 | */ |
1298 | public K lastKey() { |
1299 | K k = map.lastKey(); |
1300 | while (true) { |
1301 | if (k == null) { |
1302 | return null; |
1303 | } |
1304 | if (get(k) != null) { |
1305 | return k; |
1306 | } |
1307 | k = map.lowerKey(k); |
1308 | } |
1309 | } |
1310 | |
1311 | /** |
1312 | * Get the most recent smallest key that is larger or equal to this key. |
1313 | * |
1314 | * @param key the key (may not be null) |
1315 | * @return the result |
1316 | */ |
1317 | public K getLatestCeilingKey(K key) { |
1318 | Iterator<K> cursor = map.keyIterator(key); |
1319 | while (cursor.hasNext()) { |
1320 | key = cursor.next(); |
1321 | if (get(key, Long.MAX_VALUE) != null) { |
1322 | return key; |
1323 | } |
1324 | } |
1325 | return null; |
1326 | } |
1327 | |
1328 | /** |
1329 | * Get the smallest key that is larger than the given key, or null if no |
1330 | * such key exists. |
1331 | * |
1332 | * @param key the key (may not be null) |
1333 | * @return the result |
1334 | */ |
1335 | public K higherKey(K key) { |
1336 | while (true) { |
1337 | K k = map.higherKey(key); |
1338 | if (k == null || get(k) != null) { |
1339 | return k; |
1340 | } |
1341 | key = k; |
1342 | } |
1343 | } |
1344 | |
1345 | /** |
1346 | * Get one of the previous or next keys. There might be no value |
1347 | * available for the returned key. |
1348 | * |
1349 | * @param key the key (may not be null) |
1350 | * @param offset how many keys to skip (-1 for previous, 1 for next) |
1351 | * @return the key |
1352 | */ |
1353 | public K relativeKey(K key, long offset) { |
1354 | K k = offset > 0 ? map.ceilingKey(key) : map.floorKey(key); |
1355 | if (k == null) { |
1356 | return k; |
1357 | } |
1358 | long index = map.getKeyIndex(k); |
1359 | return map.getKey(index + offset); |
1360 | } |
1361 | |
1362 | /** |
1363 | * Get the largest key that is smaller than the given key, or null if no |
1364 | * such key exists. |
1365 | * |
1366 | * @param key the key (may not be null) |
1367 | * @return the result |
1368 | */ |
1369 | public K lowerKey(K key) { |
1370 | while (true) { |
1371 | K k = map.lowerKey(key); |
1372 | if (k == null || get(k) != null) { |
1373 | return k; |
1374 | } |
1375 | key = k; |
1376 | } |
1377 | } |
1378 | |
1379 | /** |
1380 | * Iterate over keys. |
1381 | * |
1382 | * @param from the first key to return |
1383 | * @return the iterator |
1384 | */ |
1385 | public Iterator<K> keyIterator(K from) { |
1386 | return keyIterator(from, false); |
1387 | } |
1388 | |
1389 | /** |
1390 | * Iterate over keys. |
1391 | * |
1392 | * @param from the first key to return |
1393 | * @param includeUncommitted whether uncommitted entries should be |
1394 | * included |
1395 | * @return the iterator |
1396 | */ |
1397 | public Iterator<K> keyIterator(final K from, final boolean includeUncommitted) { |
1398 | return new Iterator<K>() { |
1399 | private K currentKey = from; |
1400 | private Cursor<K, VersionedValue> cursor = map.cursor(currentKey); |
1401 | |
1402 | { |
1403 | fetchNext(); |
1404 | } |
1405 | |
1406 | private void fetchNext() { |
1407 | while (cursor.hasNext()) { |
1408 | K k; |
1409 | try { |
1410 | k = cursor.next(); |
1411 | } catch (IllegalStateException e) { |
1412 | // TODO this is a bit ugly |
1413 | if (DataUtils.getErrorCode(e.getMessage()) == |
1414 | DataUtils.ERROR_CHUNK_NOT_FOUND) { |
1415 | cursor = map.cursor(currentKey); |
1416 | // we (should) get the current key again, |
1417 | // we need to ignore that one |
1418 | if (!cursor.hasNext()) { |
1419 | break; |
1420 | } |
1421 | cursor.next(); |
1422 | if (!cursor.hasNext()) { |
1423 | break; |
1424 | } |
1425 | k = cursor.next(); |
1426 | } else { |
1427 | throw e; |
1428 | } |
1429 | } |
1430 | currentKey = k; |
1431 | if (includeUncommitted) { |
1432 | return; |
1433 | } |
1434 | if (containsKey(k)) { |
1435 | return; |
1436 | } |
1437 | } |
1438 | currentKey = null; |
1439 | } |
1440 | |
1441 | @Override |
1442 | public boolean hasNext() { |
1443 | return currentKey != null; |
1444 | } |
1445 | |
1446 | @Override |
1447 | public K next() { |
1448 | K result = currentKey; |
1449 | fetchNext(); |
1450 | return result; |
1451 | } |
1452 | |
1453 | @Override |
1454 | public void remove() { |
1455 | throw DataUtils.newUnsupportedOperationException( |
1456 | "Removing is not supported"); |
1457 | } |
1458 | }; |
1459 | } |
1460 | |
1461 | /** |
1462 | * Iterate over entries. |
1463 | * |
1464 | * @param from the first key to return |
1465 | * @return the iterator |
1466 | */ |
1467 | public Iterator<Entry<K, V>> entryIterator(final K from) { |
1468 | return new Iterator<Entry<K, V>>() { |
1469 | private Entry<K, V> current; |
1470 | private K currentKey = from; |
1471 | private Cursor<K, VersionedValue> cursor = map.cursor(currentKey); |
1472 | |
1473 | { |
1474 | fetchNext(); |
1475 | } |
1476 | |
1477 | private void fetchNext() { |
1478 | while (cursor.hasNext()) { |
1479 | K k; |
1480 | try { |
1481 | k = cursor.next(); |
1482 | } catch (IllegalStateException e) { |
1483 | // TODO this is a bit ugly |
1484 | if (DataUtils.getErrorCode(e.getMessage()) == |
1485 | DataUtils.ERROR_CHUNK_NOT_FOUND) { |
1486 | cursor = map.cursor(currentKey); |
1487 | // we (should) get the current key again, |
1488 | // we need to ignore that one |
1489 | if (!cursor.hasNext()) { |
1490 | break; |
1491 | } |
1492 | cursor.next(); |
1493 | if (!cursor.hasNext()) { |
1494 | break; |
1495 | } |
1496 | k = cursor.next(); |
1497 | } else { |
1498 | throw e; |
1499 | } |
1500 | } |
1501 | final K key = k; |
1502 | VersionedValue data = cursor.getValue(); |
1503 | data = getValue(key, readLogId, data); |
1504 | if (data != null && data.value != null) { |
1505 | @SuppressWarnings("unchecked") |
1506 | final V value = (V) data.value; |
1507 | current = new DataUtils.MapEntry<K, V>(key, value); |
1508 | currentKey = key; |
1509 | return; |
1510 | } |
1511 | } |
1512 | current = null; |
1513 | currentKey = null; |
1514 | } |
1515 | |
1516 | @Override |
1517 | public boolean hasNext() { |
1518 | return current != null; |
1519 | } |
1520 | |
1521 | @Override |
1522 | public Entry<K, V> next() { |
1523 | Entry<K, V> result = current; |
1524 | fetchNext(); |
1525 | return result; |
1526 | } |
1527 | |
1528 | @Override |
1529 | public void remove() { |
1530 | throw DataUtils.newUnsupportedOperationException( |
1531 | "Removing is not supported"); |
1532 | } |
1533 | }; |
1534 | |
1535 | } |
1536 | |
1537 | /** |
1538 | * Iterate over keys. |
1539 | * |
1540 | * @param iterator the iterator to wrap |
1541 | * @param includeUncommitted whether uncommitted entries should be |
1542 | * included |
1543 | * @return the iterator |
1544 | */ |
1545 | public Iterator<K> wrapIterator(final Iterator<K> iterator, |
1546 | final boolean includeUncommitted) { |
1547 | // TODO duplicate code for wrapIterator and entryIterator |
1548 | return new Iterator<K>() { |
1549 | private K current; |
1550 | |
1551 | { |
1552 | fetchNext(); |
1553 | } |
1554 | |
1555 | private void fetchNext() { |
1556 | while (iterator.hasNext()) { |
1557 | current = iterator.next(); |
1558 | if (includeUncommitted) { |
1559 | return; |
1560 | } |
1561 | if (containsKey(current)) { |
1562 | return; |
1563 | } |
1564 | } |
1565 | current = null; |
1566 | } |
1567 | |
1568 | @Override |
1569 | public boolean hasNext() { |
1570 | return current != null; |
1571 | } |
1572 | |
1573 | @Override |
1574 | public K next() { |
1575 | K result = current; |
1576 | fetchNext(); |
1577 | return result; |
1578 | } |
1579 | |
1580 | @Override |
1581 | public void remove() { |
1582 | throw DataUtils.newUnsupportedOperationException( |
1583 | "Removing is not supported"); |
1584 | } |
1585 | }; |
1586 | } |
1587 | |
1588 | public Transaction getTransaction() { |
1589 | return transaction; |
1590 | } |
1591 | |
1592 | public DataType getKeyType() { |
1593 | return map.getKeyType(); |
1594 | } |
1595 | |
1596 | } |
1597 | |
1598 | /** |
1599 | * A versioned value (possibly null). It contains a pointer to the old |
1600 | * value, and the value itself. |
1601 | */ |
1602 | static class VersionedValue { |
1603 | |
1604 | /** |
1605 | * The operation id. |
1606 | */ |
1607 | public long operationId; |
1608 | |
1609 | /** |
1610 | * The value. |
1611 | */ |
1612 | public Object value; |
1613 | |
1614 | @Override |
1615 | public String toString() { |
1616 | return value + (operationId == 0 ? "" : ( |
1617 | " " + |
1618 | getTransactionId(operationId) + "/" + |
1619 | getLogId(operationId))); |
1620 | } |
1621 | |
1622 | } |
1623 | |
1624 | /** |
1625 | * The value type for a versioned value. |
1626 | */ |
1627 | public static class VersionedValueType implements DataType { |
1628 | |
1629 | private final DataType valueType; |
1630 | |
1631 | VersionedValueType(DataType valueType) { |
1632 | this.valueType = valueType; |
1633 | } |
1634 | |
1635 | @Override |
1636 | public int getMemory(Object obj) { |
1637 | VersionedValue v = (VersionedValue) obj; |
1638 | return valueType.getMemory(v.value) + 8; |
1639 | } |
1640 | |
1641 | @Override |
1642 | public int compare(Object aObj, Object bObj) { |
1643 | if (aObj == bObj) { |
1644 | return 0; |
1645 | } |
1646 | VersionedValue a = (VersionedValue) aObj; |
1647 | VersionedValue b = (VersionedValue) bObj; |
1648 | long comp = a.operationId - b.operationId; |
1649 | if (comp == 0) { |
1650 | return valueType.compare(a.value, b.value); |
1651 | } |
1652 | return Long.signum(comp); |
1653 | } |
1654 | |
1655 | @Override |
1656 | public void read(ByteBuffer buff, Object[] obj, int len, boolean key) { |
1657 | if (buff.get() == 0) { |
1658 | // fast path (no op ids or null entries) |
1659 | for (int i = 0; i < len; i++) { |
1660 | VersionedValue v = new VersionedValue(); |
1661 | v.value = valueType.read(buff); |
1662 | obj[i] = v; |
1663 | } |
1664 | } else { |
1665 | // slow path (some entries may be null) |
1666 | for (int i = 0; i < len; i++) { |
1667 | obj[i] = read(buff); |
1668 | } |
1669 | } |
1670 | } |
1671 | |
1672 | @Override |
1673 | public Object read(ByteBuffer buff) { |
1674 | VersionedValue v = new VersionedValue(); |
1675 | v.operationId = DataUtils.readVarLong(buff); |
1676 | if (buff.get() == 1) { |
1677 | v.value = valueType.read(buff); |
1678 | } |
1679 | return v; |
1680 | } |
1681 | |
1682 | @Override |
1683 | public void write(WriteBuffer buff, Object[] obj, int len, boolean key) { |
1684 | boolean fastPath = true; |
1685 | for (int i = 0; i < len; i++) { |
1686 | VersionedValue v = (VersionedValue) obj[i]; |
1687 | if (v.operationId != 0 || v.value == null) { |
1688 | fastPath = false; |
1689 | } |
1690 | } |
1691 | if (fastPath) { |
1692 | buff.put((byte) 0); |
1693 | for (int i = 0; i < len; i++) { |
1694 | VersionedValue v = (VersionedValue) obj[i]; |
1695 | valueType.write(buff, v.value); |
1696 | } |
1697 | } else { |
1698 | // slow path: |
1699 | // store op ids, and some entries may be null |
1700 | buff.put((byte) 1); |
1701 | for (int i = 0; i < len; i++) { |
1702 | write(buff, obj[i]); |
1703 | } |
1704 | } |
1705 | } |
1706 | |
1707 | @Override |
1708 | public void write(WriteBuffer buff, Object obj) { |
1709 | VersionedValue v = (VersionedValue) obj; |
1710 | buff.putVarLong(v.operationId); |
1711 | if (v.value == null) { |
1712 | buff.put((byte) 0); |
1713 | } else { |
1714 | buff.put((byte) 1); |
1715 | valueType.write(buff, v.value); |
1716 | } |
1717 | } |
1718 | |
1719 | } |
1720 | |
1721 | /** |
1722 | * A data type that contains an array of objects with the specified data |
1723 | * types. |
1724 | */ |
1725 | public static class ArrayType implements DataType { |
1726 | |
1727 | private final int arrayLength; |
1728 | private final DataType[] elementTypes; |
1729 | |
1730 | ArrayType(DataType[] elementTypes) { |
1731 | this.arrayLength = elementTypes.length; |
1732 | this.elementTypes = elementTypes; |
1733 | } |
1734 | |
1735 | @Override |
1736 | public int getMemory(Object obj) { |
1737 | Object[] array = (Object[]) obj; |
1738 | int size = 0; |
1739 | for (int i = 0; i < arrayLength; i++) { |
1740 | DataType t = elementTypes[i]; |
1741 | Object o = array[i]; |
1742 | if (o != null) { |
1743 | size += t.getMemory(o); |
1744 | } |
1745 | } |
1746 | return size; |
1747 | } |
1748 | |
1749 | @Override |
1750 | public int compare(Object aObj, Object bObj) { |
1751 | if (aObj == bObj) { |
1752 | return 0; |
1753 | } |
1754 | Object[] a = (Object[]) aObj; |
1755 | Object[] b = (Object[]) bObj; |
1756 | for (int i = 0; i < arrayLength; i++) { |
1757 | DataType t = elementTypes[i]; |
1758 | int comp = t.compare(a[i], b[i]); |
1759 | if (comp != 0) { |
1760 | return comp; |
1761 | } |
1762 | } |
1763 | return 0; |
1764 | } |
1765 | |
1766 | @Override |
1767 | public void read(ByteBuffer buff, Object[] obj, |
1768 | int len, boolean key) { |
1769 | for (int i = 0; i < len; i++) { |
1770 | obj[i] = read(buff); |
1771 | } |
1772 | } |
1773 | |
1774 | @Override |
1775 | public void write(WriteBuffer buff, Object[] obj, |
1776 | int len, boolean key) { |
1777 | for (int i = 0; i < len; i++) { |
1778 | write(buff, obj[i]); |
1779 | } |
1780 | } |
1781 | |
1782 | @Override |
1783 | public void write(WriteBuffer buff, Object obj) { |
1784 | Object[] array = (Object[]) obj; |
1785 | for (int i = 0; i < arrayLength; i++) { |
1786 | DataType t = elementTypes[i]; |
1787 | Object o = array[i]; |
1788 | if (o == null) { |
1789 | buff.put((byte) 0); |
1790 | } else { |
1791 | buff.put((byte) 1); |
1792 | t.write(buff, o); |
1793 | } |
1794 | } |
1795 | } |
1796 | |
1797 | @Override |
1798 | public Object read(ByteBuffer buff) { |
1799 | Object[] array = new Object[arrayLength]; |
1800 | for (int i = 0; i < arrayLength; i++) { |
1801 | DataType t = elementTypes[i]; |
1802 | if (buff.get() == 1) { |
1803 | array[i] = t.read(buff); |
1804 | } |
1805 | } |
1806 | return array; |
1807 | } |
1808 | |
1809 | } |
1810 | |
1811 | } |