EMMA Coverage Report (generated Sun Mar 01 22:06:14 CET 2015)
[all classes][org.h2.server]

COVERAGE SUMMARY FOR SOURCE FILE [TcpServerThread.java]

nameclass, %method, %block, %line, %
TcpServerThread.java100% (2/2)85%  (17/20)83%  (1159/1392)85%  (284.9/334)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class TcpServerThread$CachedInputStream100% (1/1)67%  (4/6)57%  (40/70)56%  (10/18)
read (): int 0%   (0/1)0%   (0/14)0%   (0/4)
skip (long): long 0%   (0/1)0%   (0/16)0%   (0/4)
<static initializer> 100% (1/1)100% (7/7)100% (1/1)
TcpServerThread$CachedInputStream (InputStream): void 100% (1/1)100% (13/13)100% (4/4)
getPos (): long 100% (1/1)100% (3/3)100% (1/1)
read (byte [], int, int): int 100% (1/1)100% (17/17)100% (4/4)
     
class TcpServerThread100% (1/1)93%  (13/14)85%  (1119/1322)87%  (274.9/316)
writeValue (Value): void 0%   (0/1)0%   (0/35)0%   (0/8)
closeSession (): void 100% (1/1)64%  (46/72)70%  (16.2/23)
close (): void 100% (1/1)72%  (33/46)90%  (8.1/9)
sendRow (ResultInterface): void 100% (1/1)76%  (32/42)80%  (8/10)
run (): void 100% (1/1)78%  (234/300)88%  (60.7/69)
sendError (Throwable): void 100% (1/1)92%  (61/66)89%  (16/18)
process (): void 100% (1/1)93%  (610/658)91%  (139.8/153)
TcpServerThread (Socket, TcpServer, int): void 100% (1/1)100% (33/33)100% (8/8)
cancelStatement (String, int): void 100% (1/1)100% (15/15)100% (4/4)
getState (int): int 100% (1/1)100% (9/9)100% (3/3)
getThread (): Thread 100% (1/1)100% (3/3)100% (1/1)
setParameters (Command): void 100% (1/1)100% (25/25)100% (6/6)
setThread (Thread): void 100% (1/1)100% (4/4)100% (2/2)
trace (String): void 100% (1/1)100% (14/14)100% (2/2)

1/*
2 * Copyright 2004-2014 H2 Group. Multiple-Licensed under the MPL 2.0,
3 * and the EPL 1.0 (http://h2database.com/html/license.html).
4 * Initial Developer: H2 Group
5 */
6package org.h2.server;
7 
8import java.io.ByteArrayInputStream;
9import java.io.FilterInputStream;
10import java.io.IOException;
11import java.io.InputStream;
12import java.io.PrintWriter;
13import java.io.StringWriter;
14import java.net.Socket;
15import java.sql.SQLException;
16import java.util.ArrayList;
17 
18import org.h2.api.ErrorCode;
19import org.h2.command.Command;
20import org.h2.engine.ConnectionInfo;
21import org.h2.engine.Constants;
22import org.h2.engine.Engine;
23import org.h2.engine.Session;
24import org.h2.engine.SessionRemote;
25import org.h2.engine.SysProperties;
26import org.h2.expression.Parameter;
27import org.h2.expression.ParameterInterface;
28import org.h2.expression.ParameterRemote;
29import org.h2.jdbc.JdbcSQLException;
30import org.h2.message.DbException;
31import org.h2.result.ResultColumn;
32import org.h2.result.ResultInterface;
33import org.h2.store.LobStorageInterface;
34import org.h2.util.IOUtils;
35import org.h2.util.SmallLRUCache;
36import org.h2.util.SmallMap;
37import org.h2.util.StringUtils;
38import org.h2.value.Transfer;
39import org.h2.value.Value;
40import org.h2.value.ValueLobDb;
41 
42/**
43 * One server thread is opened per client connection.
44 */
45public class TcpServerThread implements Runnable {
46 
47    protected final Transfer transfer;
48    private final TcpServer server;
49    private Session session;
50    private boolean stop;
51    private Thread thread;
52    private Command commit;
53    private final SmallMap cache =
54            new SmallMap(SysProperties.SERVER_CACHED_OBJECTS);
55    private final SmallLRUCache<Long, CachedInputStream> lobs =
56            SmallLRUCache.newInstance(Math.max(
57                SysProperties.SERVER_CACHED_OBJECTS,
58                SysProperties.SERVER_RESULT_SET_FETCH_SIZE * 5));
59    private final int threadId;
60    private int clientVersion;
61    private String sessionId;
62 
63    TcpServerThread(Socket socket, TcpServer server, int id) {
64        this.server = server;
65        this.threadId = id;
66        transfer = new Transfer(null);
67        transfer.setSocket(socket);
68    }
69 
70    private void trace(String s) {
71        server.trace(this + " " + s);
72    }
73 
74    @Override
75    public void run() {
76        try {
77            transfer.init();
78            trace("Connect");
79            // TODO server: should support a list of allowed databases
80            // and a list of allowed clients
81            try {
82                if (!server.allow(transfer.getSocket())) {
83                    throw DbException.get(ErrorCode.REMOTE_CONNECTION_NOT_ALLOWED);
84                }
85                int minClientVersion = transfer.readInt();
86                if (minClientVersion < Constants.TCP_PROTOCOL_VERSION_6) {
87                    throw DbException.get(ErrorCode.DRIVER_VERSION_ERROR_2,
88                            "" + clientVersion, "" + Constants.TCP_PROTOCOL_VERSION_6);
89                } else if (minClientVersion > Constants.TCP_PROTOCOL_VERSION_15) {
90                    throw DbException.get(ErrorCode.DRIVER_VERSION_ERROR_2,
91                            "" + clientVersion, "" + Constants.TCP_PROTOCOL_VERSION_15);
92                }
93                int maxClientVersion = transfer.readInt();
94                if (maxClientVersion >= Constants.TCP_PROTOCOL_VERSION_15) {
95                    clientVersion = Constants.TCP_PROTOCOL_VERSION_15;
96                } else {
97                    clientVersion = minClientVersion;
98                }
99                transfer.setVersion(clientVersion);
100                String db = transfer.readString();
101                String originalURL = transfer.readString();
102                if (db == null && originalURL == null) {
103                    String targetSessionId = transfer.readString();
104                    int command = transfer.readInt();
105                    stop = true;
106                    if (command == SessionRemote.SESSION_CANCEL_STATEMENT) {
107                        // cancel a running statement
108                        int statementId = transfer.readInt();
109                        server.cancelStatement(targetSessionId, statementId);
110                    } else if (command == SessionRemote.SESSION_CHECK_KEY) {
111                        // check if this is the correct server
112                        db = server.checkKeyAndGetDatabaseName(targetSessionId);
113                        if (!targetSessionId.equals(db)) {
114                            transfer.writeInt(SessionRemote.STATUS_OK);
115                        } else {
116                            transfer.writeInt(SessionRemote.STATUS_ERROR);
117                        }
118                    }
119                }
120                String baseDir = server.getBaseDir();
121                if (baseDir == null) {
122                    baseDir = SysProperties.getBaseDir();
123                }
124                db = server.checkKeyAndGetDatabaseName(db);
125                ConnectionInfo ci = new ConnectionInfo(db);
126                ci.setOriginalURL(originalURL);
127                ci.setUserName(transfer.readString());
128                ci.setUserPasswordHash(transfer.readBytes());
129                ci.setFilePasswordHash(transfer.readBytes());
130                int len = transfer.readInt();
131                for (int i = 0; i < len; i++) {
132                    ci.setProperty(transfer.readString(), transfer.readString());
133                }
134                // override client's requested properties with server settings
135                if (baseDir != null) {
136                    ci.setBaseDir(baseDir);
137                }
138                if (server.getIfExists()) {
139                    ci.setProperty("IFEXISTS", "TRUE");
140                }
141                transfer.writeInt(SessionRemote.STATUS_OK);
142                transfer.writeInt(clientVersion);
143                transfer.flush();
144                if (clientVersion >= Constants.TCP_PROTOCOL_VERSION_13) {
145                    if (ci.getFilePasswordHash() != null) {
146                        ci.setFileEncryptionKey(transfer.readBytes());
147                    }
148                }
149                session = Engine.getInstance().createSession(ci);
150                transfer.setSession(session);
151                server.addConnection(threadId, originalURL, ci.getUserName());
152                trace("Connected");
153            } catch (Throwable e) {
154                sendError(e);
155                stop = true;
156            }
157            while (!stop) {
158                try {
159                    process();
160                } catch (Throwable e) {
161                    sendError(e);
162                }
163            }
164            trace("Disconnect");
165        } catch (Throwable e) {
166            server.traceError(e);
167        } finally {
168            close();
169        }
170    }
171 
172    private void closeSession() {
173        if (session != null) {
174            RuntimeException closeError = null;
175            try {
176                Command rollback = session.prepareLocal("ROLLBACK");
177                rollback.executeUpdate();
178            } catch (RuntimeException e) {
179                closeError = e;
180                server.traceError(e);
181            } catch (Exception e) {
182                server.traceError(e);
183            }
184            try {
185                session.close();
186                server.removeConnection(threadId);
187            } catch (RuntimeException e) {
188                if (closeError == null) {
189                    closeError = e;
190                    server.traceError(e);
191                }
192            } catch (Exception e) {
193                server.traceError(e);
194            } finally {
195                session = null;
196            }
197            if (closeError != null) {
198                throw closeError;
199            }
200        }
201    }
202 
203    /**
204     * Close a connection.
205     */
206    void close() {
207        try {
208            stop = true;
209            closeSession();
210        } catch (Exception e) {
211            server.traceError(e);
212        } finally {
213            transfer.close();
214            trace("Close");
215            server.remove(this);
216        }
217    }
218 
219    private void sendError(Throwable t) {
220        try {
221            SQLException e = DbException.convert(t).getSQLException();
222            StringWriter writer = new StringWriter();
223            e.printStackTrace(new PrintWriter(writer));
224            String trace = writer.toString();
225            String message;
226            String sql;
227            if (e instanceof JdbcSQLException) {
228                JdbcSQLException j = (JdbcSQLException) e;
229                message = j.getOriginalMessage();
230                sql = j.getSQL();
231            } else {
232                message = e.getMessage();
233                sql = null;
234            }
235            transfer.writeInt(SessionRemote.STATUS_ERROR).
236                    writeString(e.getSQLState()).writeString(message).
237                    writeString(sql).writeInt(e.getErrorCode()).writeString(trace).flush();
238        } catch (Exception e2) {
239            if (!transfer.isClosed()) {
240                server.traceError(e2);
241            }
242            // if writing the error does not work, close the connection
243            stop = true;
244        }
245    }
246 
247    private void setParameters(Command command) throws IOException {
248        int len = transfer.readInt();
249        ArrayList<? extends ParameterInterface> params = command.getParameters();
250        for (int i = 0; i < len; i++) {
251            Parameter p = (Parameter) params.get(i);
252            p.setValue(transfer.readValue());
253        }
254    }
255 
256    private void process() throws IOException {
257        int operation = transfer.readInt();
258        switch (operation) {
259        case SessionRemote.SESSION_PREPARE_READ_PARAMS:
260        case SessionRemote.SESSION_PREPARE: {
261            int id = transfer.readInt();
262            String sql = transfer.readString();
263            int old = session.getModificationId();
264            Command command = session.prepareLocal(sql);
265            boolean readonly = command.isReadOnly();
266            cache.addObject(id, command);
267            boolean isQuery = command.isQuery();
268            ArrayList<? extends ParameterInterface> params = command.getParameters();
269            transfer.writeInt(getState(old)).writeBoolean(isQuery).
270                    writeBoolean(readonly).writeInt(params.size());
271            if (operation == SessionRemote.SESSION_PREPARE_READ_PARAMS) {
272                for (ParameterInterface p : params) {
273                    ParameterRemote.writeMetaData(transfer, p);
274                }
275            }
276            transfer.flush();
277            break;
278        }
279        case SessionRemote.SESSION_CLOSE: {
280            stop = true;
281            closeSession();
282            transfer.writeInt(SessionRemote.STATUS_OK).flush();
283            close();
284            break;
285        }
286        case SessionRemote.COMMAND_COMMIT: {
287            if (commit == null) {
288                commit = session.prepareLocal("COMMIT");
289            }
290            int old = session.getModificationId();
291            commit.executeUpdate();
292            transfer.writeInt(getState(old)).flush();
293            break;
294        }
295        case SessionRemote.COMMAND_GET_META_DATA: {
296            int id = transfer.readInt();
297            int objectId = transfer.readInt();
298            Command command = (Command) cache.getObject(id, false);
299            ResultInterface result = command.getMetaData();
300            cache.addObject(objectId, result);
301            int columnCount = result.getVisibleColumnCount();
302            transfer.writeInt(SessionRemote.STATUS_OK).
303                    writeInt(columnCount).writeInt(0);
304            for (int i = 0; i < columnCount; i++) {
305                ResultColumn.writeColumn(transfer, result, i);
306            }
307            transfer.flush();
308            break;
309        }
310        case SessionRemote.COMMAND_EXECUTE_QUERY: {
311            int id = transfer.readInt();
312            int objectId = transfer.readInt();
313            int maxRows = transfer.readInt();
314            int fetchSize = transfer.readInt();
315            Command command = (Command) cache.getObject(id, false);
316            setParameters(command);
317            int old = session.getModificationId();
318            ResultInterface result;
319            synchronized (session) {
320                result = command.executeQuery(maxRows, false);
321            }
322            cache.addObject(objectId, result);
323            int columnCount = result.getVisibleColumnCount();
324            int state = getState(old);
325            transfer.writeInt(state).writeInt(columnCount);
326            int rowCount = result.getRowCount();
327            transfer.writeInt(rowCount);
328            for (int i = 0; i < columnCount; i++) {
329                ResultColumn.writeColumn(transfer, result, i);
330            }
331            int fetch = Math.min(rowCount, fetchSize);
332            for (int i = 0; i < fetch; i++) {
333                sendRow(result);
334            }
335            transfer.flush();
336            break;
337        }
338        case SessionRemote.COMMAND_EXECUTE_UPDATE: {
339            int id = transfer.readInt();
340            Command command = (Command) cache.getObject(id, false);
341            setParameters(command);
342            int old = session.getModificationId();
343            int updateCount;
344            synchronized (session) {
345                updateCount = command.executeUpdate();
346            }
347            int status;
348            if (session.isClosed()) {
349                status = SessionRemote.STATUS_CLOSED;
350            } else {
351                status = getState(old);
352            }
353            transfer.writeInt(status).writeInt(updateCount).
354                    writeBoolean(session.getAutoCommit());
355            transfer.flush();
356            break;
357        }
358        case SessionRemote.COMMAND_CLOSE: {
359            int id = transfer.readInt();
360            Command command = (Command) cache.getObject(id, true);
361            if (command != null) {
362                command.close();
363                cache.freeObject(id);
364            }
365            break;
366        }
367        case SessionRemote.RESULT_FETCH_ROWS: {
368            int id = transfer.readInt();
369            int count = transfer.readInt();
370            ResultInterface result = (ResultInterface) cache.getObject(id, false);
371            transfer.writeInt(SessionRemote.STATUS_OK);
372            for (int i = 0; i < count; i++) {
373                sendRow(result);
374            }
375            transfer.flush();
376            break;
377        }
378        case SessionRemote.RESULT_RESET: {
379            int id = transfer.readInt();
380            ResultInterface result = (ResultInterface) cache.getObject(id, false);
381            result.reset();
382            break;
383        }
384        case SessionRemote.RESULT_CLOSE: {
385            int id = transfer.readInt();
386            ResultInterface result = (ResultInterface) cache.getObject(id, true);
387            if (result != null) {
388                result.close();
389                cache.freeObject(id);
390            }
391            break;
392        }
393        case SessionRemote.CHANGE_ID: {
394            int oldId = transfer.readInt();
395            int newId = transfer.readInt();
396            Object obj = cache.getObject(oldId, false);
397            cache.freeObject(oldId);
398            cache.addObject(newId, obj);
399            break;
400        }
401        case SessionRemote.SESSION_SET_ID: {
402            sessionId = transfer.readString();
403            transfer.writeInt(SessionRemote.STATUS_OK);
404            transfer.writeBoolean(session.getAutoCommit());
405            transfer.flush();
406            break;
407        }
408        case SessionRemote.SESSION_SET_AUTOCOMMIT: {
409            boolean autoCommit = transfer.readBoolean();
410            session.setAutoCommit(autoCommit);
411            transfer.writeInt(SessionRemote.STATUS_OK).flush();
412            break;
413        }
414        case SessionRemote.SESSION_HAS_PENDING_TRANSACTION: {
415            transfer.writeInt(SessionRemote.STATUS_OK).
416                writeInt(session.hasPendingTransaction() ? 1 : 0).flush();
417            break;
418        }
419        case SessionRemote.LOB_READ: {
420            long lobId = transfer.readLong();
421            byte[] hmac;
422            CachedInputStream in;
423            boolean verifyMac;
424            if (clientVersion >= Constants.TCP_PROTOCOL_VERSION_11) {
425                if (clientVersion >= Constants.TCP_PROTOCOL_VERSION_12) {
426                    hmac = transfer.readBytes();
427                    verifyMac = true;
428                } else {
429                    hmac = null;
430                    verifyMac = false;
431                }
432                in = lobs.get(lobId);
433                if (in == null && verifyMac) {
434                    in = new CachedInputStream(null);
435                    lobs.put(lobId, in);
436                }
437            } else {
438                verifyMac = false;
439                hmac = null;
440                in = lobs.get(lobId);
441            }
442            long offset = transfer.readLong();
443            int length = transfer.readInt();
444            if (verifyMac) {
445                transfer.verifyLobMac(hmac, lobId);
446            }
447            if (in == null) {
448                throw DbException.get(ErrorCode.OBJECT_CLOSED);
449            }
450            if (in.getPos() != offset) {
451                LobStorageInterface lobStorage = session.getDataHandler().getLobStorage();
452                // only the lob id is used
453                ValueLobDb lob = ValueLobDb.create(Value.BLOB, null, -1, lobId, hmac, -1);
454                InputStream lobIn = lobStorage.getInputStream(lob, hmac, -1);
455                in = new CachedInputStream(lobIn);
456                lobs.put(lobId, in);
457                lobIn.skip(offset);
458            }
459            // limit the buffer size
460            length = Math.min(16 * Constants.IO_BUFFER_SIZE, length);
461            byte[] buff = new byte[length];
462            length = IOUtils.readFully(in, buff, length);
463            transfer.writeInt(SessionRemote.STATUS_OK);
464            transfer.writeInt(length);
465            transfer.writeBytes(buff, 0, length);
466            transfer.flush();
467            break;
468        }
469        default:
470            trace("Unknown operation: " + operation);
471            closeSession();
472            close();
473        }
474    }
475 
476    private int getState(int oldModificationId) {
477        if (session.getModificationId() == oldModificationId) {
478            return SessionRemote.STATUS_OK;
479        }
480        return SessionRemote.STATUS_OK_STATE_CHANGED;
481    }
482 
483    private void sendRow(ResultInterface result) throws IOException {
484        if (result.next()) {
485            transfer.writeBoolean(true);
486            Value[] v = result.currentRow();
487            for (int i = 0; i < result.getVisibleColumnCount(); i++) {
488                if (clientVersion >= Constants.TCP_PROTOCOL_VERSION_12) {
489                    transfer.writeValue(v[i]);
490                } else {
491                    writeValue(v[i]);
492                }
493            }
494        } else {
495            transfer.writeBoolean(false);
496        }
497    }
498 
499    private void writeValue(Value v) throws IOException {
500        if (v.getType() == Value.CLOB || v.getType() == Value.BLOB) {
501            if (v instanceof ValueLobDb) {
502                ValueLobDb lob = (ValueLobDb) v;
503                if (lob.isStored()) {
504                    long id = lob.getLobId();
505                    lobs.put(id, new CachedInputStream(null));
506                }
507            }
508        }
509        transfer.writeValue(v);
510    }
511 
512    void setThread(Thread thread) {
513        this.thread = thread;
514    }
515 
516    Thread getThread() {
517        return thread;
518    }
519 
520    /**
521     * Cancel a running statement.
522     *
523     * @param targetSessionId the session id
524     * @param statementId the statement to cancel
525     */
526    void cancelStatement(String targetSessionId, int statementId) {
527        if (StringUtils.equals(targetSessionId, this.sessionId)) {
528            Command cmd = (Command) cache.getObject(statementId, false);
529            cmd.cancel();
530        }
531    }
532 
533    /**
534     * An input stream with a position.
535     */
536    static class CachedInputStream extends FilterInputStream {
537 
538        private static final ByteArrayInputStream DUMMY =
539                new ByteArrayInputStream(new byte[0]);
540        private long pos;
541 
542        CachedInputStream(InputStream in) {
543            super(in == null ? DUMMY : in);
544            if (in == null) {
545                pos = -1;
546            }
547        }
548 
549        @Override
550        public int read(byte[] buff, int off, int len) throws IOException {
551            len = super.read(buff, off, len);
552            if (len > 0) {
553                pos += len;
554            }
555            return len;
556        }
557 
558        @Override
559        public int read() throws IOException {
560            int x = in.read();
561            if (x >= 0) {
562                pos++;
563            }
564            return x;
565        }
566 
567        @Override
568        public long skip(long n) throws IOException {
569            n = super.skip(n);
570            if (n > 0) {
571                pos += n;
572            }
573            return n;
574        }
575 
576        public long getPos() {
577            return pos;
578        }
579 
580    }
581 
582}

[all classes][org.h2.server]
EMMA 2.0.5312 (C) Vladimir Roubtsov