In my implementation after the Methode
selectedKeys() and the check for Readability
if ( key.isReadable())another thread is processing the incoming Message!
My Problem is that the selectKeys()-Methode returns the already processed Key again until the Data is read from channel by the prcessing Thread !!!
I tried to remove OP_READ from the key's interest with interestOps-Methode and set it again after Thread has finished
to read the Data from Channel.
The Problem with this solution is that setting the OP_READ Interest again needs (40-500 ms) and this isn't very performant!
Is there any way to read Data from Channels from another Thread (not blocking the Main-Thread so it can monitor other Key's for processing)
without to change the interest's of the Key which is currently in use (for reading data) of the other Thread!
Here some code of my Main-Thread CommServerSocket
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
| public void run() { int keysNumber = 1; while ( !this.isInterrupted()) { SelectionKey key = null;
try { if ( keysNumber != 0 ) { if ( this.trace.isTraceLevelOn( ATrace.DETAIL | JCubaTrace.COMMUNICATION )) { this.trace.communication( ATrace.DETAIL, "Select-Call..." ); } }
keysNumber = this.srvSelector.select( 50 ); if ( keysNumber > 0 ) { if ( this.trace.isTraceLevelOn( ATrace.DETAIL | JCubaTrace.COMMUNICATION )) { this.trace.communication( ATrace.DETAIL, "Select-Call returns <" + keysNumber + ">" ); }
Set acceptSelectedSet = this.srvSelector.selectedKeys();
Iterator it = acceptSelectedSet.iterator();
while ( it.hasNext()) { key = (SelectionKey) it.next();
it.remove();
if ( key.isValid()) { this.trace.communication( ATrace.ALL, "Key <" +key +"> fuer CommId <" + key.attachment() +">" );
try { if( key.isConnectable()) { this.trace.communication( ATrace.ALL, "Key for CommId <" +key.attachment() + "> is Connectable" ); }
if ( key.isAcceptable()) { ... } if ( key.isReadable()) { this.trace.communication( ATrace.ALL, "Key for CommId <" + key.attachment() + "> is Readable" );
this.processClientRequest( key ); } } catch( CancelledKeyException e ) { this.trace.exception( e );
try { this.trace.communication( ATrace.ALL, ATrace.ERROR +"Key for CommId <" + key.attachment() + "> not valid anymore!"); key.cancel(); } catch( Exception err ) { this.trace.exception( e ); } } } else { this.trace.communication( ATrace.ALL, ATrace.ERROR +"Key for CommId <" + key.attachment() + "> not valid anymore!"); key.cancel(); } } } else { } } catch( CancelledKeyException e ) { this.trace.exception( e );
try { this.trace.communication( ATrace.ALL, ATrace.ERROR +"Key for CommId <" + key.attachment() + "> not valid anymore!"); } catch( Exception err ) { this.trace.exception( e ); } } catch ( Exception exc ) { exc.printStackTrace(); this.trace.exception( exc ); } }
... ...
public void processDataStart() { this.trace.communication( ATrace.ALL, "Reading data for CommId <" + this.commId + "> ..." );
long startTime = System.currentTimeMillis();
this.selectionKey.interestOps( this.interestOps );
long endTime = System.currentTimeMillis(); this.trace.communication( ATrace.ALL, "<SelectionKey.OP_READ> for CommId <" + this.commId + "> " + "deleted! Duration <" +( endTime - startTime ) +">" );
ClientCommThread processThread = new ClientCommThread( this ); processThread.start(); }
.... ....
private void processDataEnd() { this.trace.communication( ATrace.ALL, "Data for CommId <" + this.commId + "> read!" );
long startTime = System.currentTimeMillis();
this.selectionKey.interestOps( this.interestOps | (SelectionKey.OP_READ));
long endTime = System.currentTimeMillis();
this.trace.communication( ATrace.ALL, "<SelectionKey.OP_READ> for CommId <" + this.commId + "> " + "set again! Duration <" +( endTime - startTime ) +">" );
if ( this.deregisterFlag == true ) { this.commServerSocket.deregit( this.commId ); } } |
I hope someone could help me!
thx
mikr
PS: Changing from 1.4.2_05 to 1.4.2_08 improved the Duration for set the Interest a little Bit!