1 | /* |
2 | * Copyright 2004-2014 H2 Group. Multiple-Licensed under the MPL 2.0, |
3 | * and the EPL 1.0 (http://h2database.com/html/license.html). |
4 | * Initial Developer: H2 Group |
5 | */ |
6 | package org.h2.server; |
7 | |
8 | import java.io.ByteArrayInputStream; |
9 | import java.io.FilterInputStream; |
10 | import java.io.IOException; |
11 | import java.io.InputStream; |
12 | import java.io.PrintWriter; |
13 | import java.io.StringWriter; |
14 | import java.net.Socket; |
15 | import java.sql.SQLException; |
16 | import java.util.ArrayList; |
17 | |
18 | import org.h2.api.ErrorCode; |
19 | import org.h2.command.Command; |
20 | import org.h2.engine.ConnectionInfo; |
21 | import org.h2.engine.Constants; |
22 | import org.h2.engine.Engine; |
23 | import org.h2.engine.Session; |
24 | import org.h2.engine.SessionRemote; |
25 | import org.h2.engine.SysProperties; |
26 | import org.h2.expression.Parameter; |
27 | import org.h2.expression.ParameterInterface; |
28 | import org.h2.expression.ParameterRemote; |
29 | import org.h2.jdbc.JdbcSQLException; |
30 | import org.h2.message.DbException; |
31 | import org.h2.result.ResultColumn; |
32 | import org.h2.result.ResultInterface; |
33 | import org.h2.store.LobStorageInterface; |
34 | import org.h2.util.IOUtils; |
35 | import org.h2.util.SmallLRUCache; |
36 | import org.h2.util.SmallMap; |
37 | import org.h2.util.StringUtils; |
38 | import org.h2.value.Transfer; |
39 | import org.h2.value.Value; |
40 | import org.h2.value.ValueLobDb; |
41 | |
42 | /** |
43 | * One server thread is opened per client connection. |
44 | */ |
45 | public class TcpServerThread implements Runnable { |
46 | |
47 | protected final Transfer transfer; |
48 | private final TcpServer server; |
49 | private Session session; |
50 | private boolean stop; |
51 | private Thread thread; |
52 | private Command commit; |
53 | private final SmallMap cache = |
54 | new SmallMap(SysProperties.SERVER_CACHED_OBJECTS); |
55 | private final SmallLRUCache<Long, CachedInputStream> lobs = |
56 | SmallLRUCache.newInstance(Math.max( |
57 | SysProperties.SERVER_CACHED_OBJECTS, |
58 | SysProperties.SERVER_RESULT_SET_FETCH_SIZE * 5)); |
59 | private final int threadId; |
60 | private int clientVersion; |
61 | private String sessionId; |
62 | |
63 | TcpServerThread(Socket socket, TcpServer server, int id) { |
64 | this.server = server; |
65 | this.threadId = id; |
66 | transfer = new Transfer(null); |
67 | transfer.setSocket(socket); |
68 | } |
69 | |
70 | private void trace(String s) { |
71 | server.trace(this + " " + s); |
72 | } |
73 | |
74 | @Override |
75 | public void run() { |
76 | try { |
77 | transfer.init(); |
78 | trace("Connect"); |
79 | // TODO server: should support a list of allowed databases |
80 | // and a list of allowed clients |
81 | try { |
82 | if (!server.allow(transfer.getSocket())) { |
83 | throw DbException.get(ErrorCode.REMOTE_CONNECTION_NOT_ALLOWED); |
84 | } |
85 | int minClientVersion = transfer.readInt(); |
86 | if (minClientVersion < Constants.TCP_PROTOCOL_VERSION_6) { |
87 | throw DbException.get(ErrorCode.DRIVER_VERSION_ERROR_2, |
88 | "" + clientVersion, "" + Constants.TCP_PROTOCOL_VERSION_6); |
89 | } else if (minClientVersion > Constants.TCP_PROTOCOL_VERSION_15) { |
90 | throw DbException.get(ErrorCode.DRIVER_VERSION_ERROR_2, |
91 | "" + clientVersion, "" + Constants.TCP_PROTOCOL_VERSION_15); |
92 | } |
93 | int maxClientVersion = transfer.readInt(); |
94 | if (maxClientVersion >= Constants.TCP_PROTOCOL_VERSION_15) { |
95 | clientVersion = Constants.TCP_PROTOCOL_VERSION_15; |
96 | } else { |
97 | clientVersion = minClientVersion; |
98 | } |
99 | transfer.setVersion(clientVersion); |
100 | String db = transfer.readString(); |
101 | String originalURL = transfer.readString(); |
102 | if (db == null && originalURL == null) { |
103 | String targetSessionId = transfer.readString(); |
104 | int command = transfer.readInt(); |
105 | stop = true; |
106 | if (command == SessionRemote.SESSION_CANCEL_STATEMENT) { |
107 | // cancel a running statement |
108 | int statementId = transfer.readInt(); |
109 | server.cancelStatement(targetSessionId, statementId); |
110 | } else if (command == SessionRemote.SESSION_CHECK_KEY) { |
111 | // check if this is the correct server |
112 | db = server.checkKeyAndGetDatabaseName(targetSessionId); |
113 | if (!targetSessionId.equals(db)) { |
114 | transfer.writeInt(SessionRemote.STATUS_OK); |
115 | } else { |
116 | transfer.writeInt(SessionRemote.STATUS_ERROR); |
117 | } |
118 | } |
119 | } |
120 | String baseDir = server.getBaseDir(); |
121 | if (baseDir == null) { |
122 | baseDir = SysProperties.getBaseDir(); |
123 | } |
124 | db = server.checkKeyAndGetDatabaseName(db); |
125 | ConnectionInfo ci = new ConnectionInfo(db); |
126 | ci.setOriginalURL(originalURL); |
127 | ci.setUserName(transfer.readString()); |
128 | ci.setUserPasswordHash(transfer.readBytes()); |
129 | ci.setFilePasswordHash(transfer.readBytes()); |
130 | int len = transfer.readInt(); |
131 | for (int i = 0; i < len; i++) { |
132 | ci.setProperty(transfer.readString(), transfer.readString()); |
133 | } |
134 | // override client's requested properties with server settings |
135 | if (baseDir != null) { |
136 | ci.setBaseDir(baseDir); |
137 | } |
138 | if (server.getIfExists()) { |
139 | ci.setProperty("IFEXISTS", "TRUE"); |
140 | } |
141 | transfer.writeInt(SessionRemote.STATUS_OK); |
142 | transfer.writeInt(clientVersion); |
143 | transfer.flush(); |
144 | if (clientVersion >= Constants.TCP_PROTOCOL_VERSION_13) { |
145 | if (ci.getFilePasswordHash() != null) { |
146 | ci.setFileEncryptionKey(transfer.readBytes()); |
147 | } |
148 | } |
149 | session = Engine.getInstance().createSession(ci); |
150 | transfer.setSession(session); |
151 | server.addConnection(threadId, originalURL, ci.getUserName()); |
152 | trace("Connected"); |
153 | } catch (Throwable e) { |
154 | sendError(e); |
155 | stop = true; |
156 | } |
157 | while (!stop) { |
158 | try { |
159 | process(); |
160 | } catch (Throwable e) { |
161 | sendError(e); |
162 | } |
163 | } |
164 | trace("Disconnect"); |
165 | } catch (Throwable e) { |
166 | server.traceError(e); |
167 | } finally { |
168 | close(); |
169 | } |
170 | } |
171 | |
172 | private void closeSession() { |
173 | if (session != null) { |
174 | RuntimeException closeError = null; |
175 | try { |
176 | Command rollback = session.prepareLocal("ROLLBACK"); |
177 | rollback.executeUpdate(); |
178 | } catch (RuntimeException e) { |
179 | closeError = e; |
180 | server.traceError(e); |
181 | } catch (Exception e) { |
182 | server.traceError(e); |
183 | } |
184 | try { |
185 | session.close(); |
186 | server.removeConnection(threadId); |
187 | } catch (RuntimeException e) { |
188 | if (closeError == null) { |
189 | closeError = e; |
190 | server.traceError(e); |
191 | } |
192 | } catch (Exception e) { |
193 | server.traceError(e); |
194 | } finally { |
195 | session = null; |
196 | } |
197 | if (closeError != null) { |
198 | throw closeError; |
199 | } |
200 | } |
201 | } |
202 | |
203 | /** |
204 | * Close a connection. |
205 | */ |
206 | void close() { |
207 | try { |
208 | stop = true; |
209 | closeSession(); |
210 | } catch (Exception e) { |
211 | server.traceError(e); |
212 | } finally { |
213 | transfer.close(); |
214 | trace("Close"); |
215 | server.remove(this); |
216 | } |
217 | } |
218 | |
219 | private void sendError(Throwable t) { |
220 | try { |
221 | SQLException e = DbException.convert(t).getSQLException(); |
222 | StringWriter writer = new StringWriter(); |
223 | e.printStackTrace(new PrintWriter(writer)); |
224 | String trace = writer.toString(); |
225 | String message; |
226 | String sql; |
227 | if (e instanceof JdbcSQLException) { |
228 | JdbcSQLException j = (JdbcSQLException) e; |
229 | message = j.getOriginalMessage(); |
230 | sql = j.getSQL(); |
231 | } else { |
232 | message = e.getMessage(); |
233 | sql = null; |
234 | } |
235 | transfer.writeInt(SessionRemote.STATUS_ERROR). |
236 | writeString(e.getSQLState()).writeString(message). |
237 | writeString(sql).writeInt(e.getErrorCode()).writeString(trace).flush(); |
238 | } catch (Exception e2) { |
239 | if (!transfer.isClosed()) { |
240 | server.traceError(e2); |
241 | } |
242 | // if writing the error does not work, close the connection |
243 | stop = true; |
244 | } |
245 | } |
246 | |
247 | private void setParameters(Command command) throws IOException { |
248 | int len = transfer.readInt(); |
249 | ArrayList<? extends ParameterInterface> params = command.getParameters(); |
250 | for (int i = 0; i < len; i++) { |
251 | Parameter p = (Parameter) params.get(i); |
252 | p.setValue(transfer.readValue()); |
253 | } |
254 | } |
255 | |
256 | private void process() throws IOException { |
257 | int operation = transfer.readInt(); |
258 | switch (operation) { |
259 | case SessionRemote.SESSION_PREPARE_READ_PARAMS: |
260 | case SessionRemote.SESSION_PREPARE: { |
261 | int id = transfer.readInt(); |
262 | String sql = transfer.readString(); |
263 | int old = session.getModificationId(); |
264 | Command command = session.prepareLocal(sql); |
265 | boolean readonly = command.isReadOnly(); |
266 | cache.addObject(id, command); |
267 | boolean isQuery = command.isQuery(); |
268 | ArrayList<? extends ParameterInterface> params = command.getParameters(); |
269 | transfer.writeInt(getState(old)).writeBoolean(isQuery). |
270 | writeBoolean(readonly).writeInt(params.size()); |
271 | if (operation == SessionRemote.SESSION_PREPARE_READ_PARAMS) { |
272 | for (ParameterInterface p : params) { |
273 | ParameterRemote.writeMetaData(transfer, p); |
274 | } |
275 | } |
276 | transfer.flush(); |
277 | break; |
278 | } |
279 | case SessionRemote.SESSION_CLOSE: { |
280 | stop = true; |
281 | closeSession(); |
282 | transfer.writeInt(SessionRemote.STATUS_OK).flush(); |
283 | close(); |
284 | break; |
285 | } |
286 | case SessionRemote.COMMAND_COMMIT: { |
287 | if (commit == null) { |
288 | commit = session.prepareLocal("COMMIT"); |
289 | } |
290 | int old = session.getModificationId(); |
291 | commit.executeUpdate(); |
292 | transfer.writeInt(getState(old)).flush(); |
293 | break; |
294 | } |
295 | case SessionRemote.COMMAND_GET_META_DATA: { |
296 | int id = transfer.readInt(); |
297 | int objectId = transfer.readInt(); |
298 | Command command = (Command) cache.getObject(id, false); |
299 | ResultInterface result = command.getMetaData(); |
300 | cache.addObject(objectId, result); |
301 | int columnCount = result.getVisibleColumnCount(); |
302 | transfer.writeInt(SessionRemote.STATUS_OK). |
303 | writeInt(columnCount).writeInt(0); |
304 | for (int i = 0; i < columnCount; i++) { |
305 | ResultColumn.writeColumn(transfer, result, i); |
306 | } |
307 | transfer.flush(); |
308 | break; |
309 | } |
310 | case SessionRemote.COMMAND_EXECUTE_QUERY: { |
311 | int id = transfer.readInt(); |
312 | int objectId = transfer.readInt(); |
313 | int maxRows = transfer.readInt(); |
314 | int fetchSize = transfer.readInt(); |
315 | Command command = (Command) cache.getObject(id, false); |
316 | setParameters(command); |
317 | int old = session.getModificationId(); |
318 | ResultInterface result; |
319 | synchronized (session) { |
320 | result = command.executeQuery(maxRows, false); |
321 | } |
322 | cache.addObject(objectId, result); |
323 | int columnCount = result.getVisibleColumnCount(); |
324 | int state = getState(old); |
325 | transfer.writeInt(state).writeInt(columnCount); |
326 | int rowCount = result.getRowCount(); |
327 | transfer.writeInt(rowCount); |
328 | for (int i = 0; i < columnCount; i++) { |
329 | ResultColumn.writeColumn(transfer, result, i); |
330 | } |
331 | int fetch = Math.min(rowCount, fetchSize); |
332 | for (int i = 0; i < fetch; i++) { |
333 | sendRow(result); |
334 | } |
335 | transfer.flush(); |
336 | break; |
337 | } |
338 | case SessionRemote.COMMAND_EXECUTE_UPDATE: { |
339 | int id = transfer.readInt(); |
340 | Command command = (Command) cache.getObject(id, false); |
341 | setParameters(command); |
342 | int old = session.getModificationId(); |
343 | int updateCount; |
344 | synchronized (session) { |
345 | updateCount = command.executeUpdate(); |
346 | } |
347 | int status; |
348 | if (session.isClosed()) { |
349 | status = SessionRemote.STATUS_CLOSED; |
350 | } else { |
351 | status = getState(old); |
352 | } |
353 | transfer.writeInt(status).writeInt(updateCount). |
354 | writeBoolean(session.getAutoCommit()); |
355 | transfer.flush(); |
356 | break; |
357 | } |
358 | case SessionRemote.COMMAND_CLOSE: { |
359 | int id = transfer.readInt(); |
360 | Command command = (Command) cache.getObject(id, true); |
361 | if (command != null) { |
362 | command.close(); |
363 | cache.freeObject(id); |
364 | } |
365 | break; |
366 | } |
367 | case SessionRemote.RESULT_FETCH_ROWS: { |
368 | int id = transfer.readInt(); |
369 | int count = transfer.readInt(); |
370 | ResultInterface result = (ResultInterface) cache.getObject(id, false); |
371 | transfer.writeInt(SessionRemote.STATUS_OK); |
372 | for (int i = 0; i < count; i++) { |
373 | sendRow(result); |
374 | } |
375 | transfer.flush(); |
376 | break; |
377 | } |
378 | case SessionRemote.RESULT_RESET: { |
379 | int id = transfer.readInt(); |
380 | ResultInterface result = (ResultInterface) cache.getObject(id, false); |
381 | result.reset(); |
382 | break; |
383 | } |
384 | case SessionRemote.RESULT_CLOSE: { |
385 | int id = transfer.readInt(); |
386 | ResultInterface result = (ResultInterface) cache.getObject(id, true); |
387 | if (result != null) { |
388 | result.close(); |
389 | cache.freeObject(id); |
390 | } |
391 | break; |
392 | } |
393 | case SessionRemote.CHANGE_ID: { |
394 | int oldId = transfer.readInt(); |
395 | int newId = transfer.readInt(); |
396 | Object obj = cache.getObject(oldId, false); |
397 | cache.freeObject(oldId); |
398 | cache.addObject(newId, obj); |
399 | break; |
400 | } |
401 | case SessionRemote.SESSION_SET_ID: { |
402 | sessionId = transfer.readString(); |
403 | transfer.writeInt(SessionRemote.STATUS_OK); |
404 | transfer.writeBoolean(session.getAutoCommit()); |
405 | transfer.flush(); |
406 | break; |
407 | } |
408 | case SessionRemote.SESSION_SET_AUTOCOMMIT: { |
409 | boolean autoCommit = transfer.readBoolean(); |
410 | session.setAutoCommit(autoCommit); |
411 | transfer.writeInt(SessionRemote.STATUS_OK).flush(); |
412 | break; |
413 | } |
414 | case SessionRemote.SESSION_HAS_PENDING_TRANSACTION: { |
415 | transfer.writeInt(SessionRemote.STATUS_OK). |
416 | writeInt(session.hasPendingTransaction() ? 1 : 0).flush(); |
417 | break; |
418 | } |
419 | case SessionRemote.LOB_READ: { |
420 | long lobId = transfer.readLong(); |
421 | byte[] hmac; |
422 | CachedInputStream in; |
423 | boolean verifyMac; |
424 | if (clientVersion >= Constants.TCP_PROTOCOL_VERSION_11) { |
425 | if (clientVersion >= Constants.TCP_PROTOCOL_VERSION_12) { |
426 | hmac = transfer.readBytes(); |
427 | verifyMac = true; |
428 | } else { |
429 | hmac = null; |
430 | verifyMac = false; |
431 | } |
432 | in = lobs.get(lobId); |
433 | if (in == null && verifyMac) { |
434 | in = new CachedInputStream(null); |
435 | lobs.put(lobId, in); |
436 | } |
437 | } else { |
438 | verifyMac = false; |
439 | hmac = null; |
440 | in = lobs.get(lobId); |
441 | } |
442 | long offset = transfer.readLong(); |
443 | int length = transfer.readInt(); |
444 | if (verifyMac) { |
445 | transfer.verifyLobMac(hmac, lobId); |
446 | } |
447 | if (in == null) { |
448 | throw DbException.get(ErrorCode.OBJECT_CLOSED); |
449 | } |
450 | if (in.getPos() != offset) { |
451 | LobStorageInterface lobStorage = session.getDataHandler().getLobStorage(); |
452 | // only the lob id is used |
453 | ValueLobDb lob = ValueLobDb.create(Value.BLOB, null, -1, lobId, hmac, -1); |
454 | InputStream lobIn = lobStorage.getInputStream(lob, hmac, -1); |
455 | in = new CachedInputStream(lobIn); |
456 | lobs.put(lobId, in); |
457 | lobIn.skip(offset); |
458 | } |
459 | // limit the buffer size |
460 | length = Math.min(16 * Constants.IO_BUFFER_SIZE, length); |
461 | byte[] buff = new byte[length]; |
462 | length = IOUtils.readFully(in, buff, length); |
463 | transfer.writeInt(SessionRemote.STATUS_OK); |
464 | transfer.writeInt(length); |
465 | transfer.writeBytes(buff, 0, length); |
466 | transfer.flush(); |
467 | break; |
468 | } |
469 | default: |
470 | trace("Unknown operation: " + operation); |
471 | closeSession(); |
472 | close(); |
473 | } |
474 | } |
475 | |
476 | private int getState(int oldModificationId) { |
477 | if (session.getModificationId() == oldModificationId) { |
478 | return SessionRemote.STATUS_OK; |
479 | } |
480 | return SessionRemote.STATUS_OK_STATE_CHANGED; |
481 | } |
482 | |
483 | private void sendRow(ResultInterface result) throws IOException { |
484 | if (result.next()) { |
485 | transfer.writeBoolean(true); |
486 | Value[] v = result.currentRow(); |
487 | for (int i = 0; i < result.getVisibleColumnCount(); i++) { |
488 | if (clientVersion >= Constants.TCP_PROTOCOL_VERSION_12) { |
489 | transfer.writeValue(v[i]); |
490 | } else { |
491 | writeValue(v[i]); |
492 | } |
493 | } |
494 | } else { |
495 | transfer.writeBoolean(false); |
496 | } |
497 | } |
498 | |
499 | private void writeValue(Value v) throws IOException { |
500 | if (v.getType() == Value.CLOB || v.getType() == Value.BLOB) { |
501 | if (v instanceof ValueLobDb) { |
502 | ValueLobDb lob = (ValueLobDb) v; |
503 | if (lob.isStored()) { |
504 | long id = lob.getLobId(); |
505 | lobs.put(id, new CachedInputStream(null)); |
506 | } |
507 | } |
508 | } |
509 | transfer.writeValue(v); |
510 | } |
511 | |
512 | void setThread(Thread thread) { |
513 | this.thread = thread; |
514 | } |
515 | |
516 | Thread getThread() { |
517 | return thread; |
518 | } |
519 | |
520 | /** |
521 | * Cancel a running statement. |
522 | * |
523 | * @param targetSessionId the session id |
524 | * @param statementId the statement to cancel |
525 | */ |
526 | void cancelStatement(String targetSessionId, int statementId) { |
527 | if (StringUtils.equals(targetSessionId, this.sessionId)) { |
528 | Command cmd = (Command) cache.getObject(statementId, false); |
529 | cmd.cancel(); |
530 | } |
531 | } |
532 | |
533 | /** |
534 | * An input stream with a position. |
535 | */ |
536 | static class CachedInputStream extends FilterInputStream { |
537 | |
538 | private static final ByteArrayInputStream DUMMY = |
539 | new ByteArrayInputStream(new byte[0]); |
540 | private long pos; |
541 | |
542 | CachedInputStream(InputStream in) { |
543 | super(in == null ? DUMMY : in); |
544 | if (in == null) { |
545 | pos = -1; |
546 | } |
547 | } |
548 | |
549 | @Override |
550 | public int read(byte[] buff, int off, int len) throws IOException { |
551 | len = super.read(buff, off, len); |
552 | if (len > 0) { |
553 | pos += len; |
554 | } |
555 | return len; |
556 | } |
557 | |
558 | @Override |
559 | public int read() throws IOException { |
560 | int x = in.read(); |
561 | if (x >= 0) { |
562 | pos++; |
563 | } |
564 | return x; |
565 | } |
566 | |
567 | @Override |
568 | public long skip(long n) throws IOException { |
569 | n = super.skip(n); |
570 | if (n > 0) { |
571 | pos += n; |
572 | } |
573 | return n; |
574 | } |
575 | |
576 | public long getPos() { |
577 | return pos; |
578 | } |
579 | |
580 | } |
581 | |
582 | } |