001package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.OutputStream;
025import java.net.InetSocketAddress;
026import java.net.ServerSocket;
027import java.net.Socket;
028import java.net.SocketAddress;
029import java.nio.channels.SelectionKey;
030import java.nio.channels.Selector;
031import java.nio.channels.ServerSocketChannel;
032import java.nio.channels.SocketChannel;
033import java.util.Iterator;
034import java.util.Map;
035import java.util.Set;
036import java.util.concurrent.ConcurrentHashMap;
037import java.util.concurrent.atomic.AtomicBoolean;
038
039import org.apache.commons.jcs3.auxiliary.lateral.LateralElementDescriptor;
040import org.apache.commons.jcs3.auxiliary.lateral.behavior.ILateralCacheListener;
041import org.apache.commons.jcs3.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
042import org.apache.commons.jcs3.engine.CacheInfo;
043import org.apache.commons.jcs3.engine.behavior.ICacheElement;
044import org.apache.commons.jcs3.engine.behavior.ICompositeCacheManager;
045import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
046import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
047import org.apache.commons.jcs3.engine.control.CompositeCache;
048import org.apache.commons.jcs3.log.Log;
049import org.apache.commons.jcs3.log.LogManager;
050import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
051
052/**
053 * Listens for connections from other TCP lateral caches and handles them. The initialization method
054 * starts a listening thread, which creates a socket server. When messages are received they are
055 * passed to a pooled executor which then calls the appropriate handle method.
056 */
057public class LateralTCPListener<K, V>
058    implements ILateralCacheListener<K, V>, IShutdownObserver
059{
060    /** The logger */
061    private static final Log log = LogManager.getLog( LateralTCPListener.class );
062
063    /** How long the server will block on an accept(). 0 is infinite. */
064    private static final int acceptTimeOut = 1000;
065
066    /** The CacheHub this listener is associated with */
067    private transient ICompositeCacheManager cacheManager;
068
069    /** Map of available instances, keyed by port */
070    private static final ConcurrentHashMap<String, ILateralCacheListener<?, ?>> instances =
071        new ConcurrentHashMap<>();
072
073    /** Configuration attributes */
074    private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
075
076    /** The listener thread */
077    private Thread listenerThread;
078
079    /**
080     * Serializer for reading and writing
081     */
082    private IElementSerializer serializer;
083
084    /** put count */
085    private int putCnt;
086
087    /** remove count */
088    private int removeCnt;
089
090    /** get count */
091    private int getCnt;
092
093    /**
094     * Use the vmid by default. This can be set for testing. If we ever need to run more than one
095     * per vm, then we need a new technique.
096     */
097    private long listenerId = CacheInfo.listenerId;
098
099    /** is this shut down? */
100    private final AtomicBoolean shutdown = new AtomicBoolean();
101
102    /** is this terminated? */
103    private final AtomicBoolean terminated = new AtomicBoolean();
104
105    /**
106     * Gets the instance attribute of the LateralCacheTCPListener class.
107     * <p>
108     * @param ilca ITCPLateralCacheAttributes
109     * @param cacheMgr
110     * @return The instance value
111     * @deprecated Specify serializer
112     */
113    @Deprecated
114    @SuppressWarnings("unchecked") // Need to cast because of common map for all instances
115    public static <K, V> LateralTCPListener<K, V>
116        getInstance( final ITCPLateralCacheAttributes ilca, final ICompositeCacheManager cacheMgr)
117    {
118        return (LateralTCPListener<K, V>) instances.computeIfAbsent(
119                String.valueOf( ilca.getTcpListenerPort() ),
120                k -> {
121                    final LateralTCPListener<K, V> newIns = new LateralTCPListener<>( ilca, new StandardSerializer() );
122
123                    newIns.init();
124                    newIns.setCacheManager( cacheMgr );
125
126                    log.info("Created new listener {0}", ilca::getTcpListenerPort);
127
128                    return newIns;
129                });
130    }
131
132    /**
133     * Gets the instance attribute of the LateralCacheTCPListener class.
134     * <p>
135     * @param ilca ITCPLateralCacheAttributes
136     * @param cacheMgr
137     * @param serializer the serializer to use when receiving
138     * @return The instance value
139     */
140    @SuppressWarnings("unchecked") // Need to cast because of common map for all instances
141    public static <K, V> LateralTCPListener<K, V>
142        getInstance( final ITCPLateralCacheAttributes ilca, final ICompositeCacheManager cacheMgr, final IElementSerializer serializer )
143    {
144        return (LateralTCPListener<K, V>) instances.computeIfAbsent(
145                String.valueOf( ilca.getTcpListenerPort() ),
146                k -> {
147                    final LateralTCPListener<K, V> newIns = new LateralTCPListener<>( ilca, serializer );
148
149                    newIns.init();
150                    newIns.setCacheManager( cacheMgr );
151
152                    log.info("Created new listener {0}", ilca::getTcpListenerPort);
153
154                    return newIns;
155                });
156    }
157
158    /**
159     * Only need one since it does work for all regions, just reference by multiple region names.
160     * <p>
161     * @param ilca ITCPLateralCacheAttributes
162     * @deprecated Specify serializer
163     */
164    @Deprecated
165    protected LateralTCPListener( final ITCPLateralCacheAttributes ilca )
166    {
167        this(ilca, new StandardSerializer());
168    }
169
170    /**
171     * Only need one since it does work for all regions, just reference by multiple region names.
172     * <p>
173     * @param ilca ITCPLateralCacheAttributes
174     * @param serializer the serializer to use when receiving
175     */
176    protected LateralTCPListener( final ITCPLateralCacheAttributes ilca, final IElementSerializer serializer )
177    {
178        this.setTcpLateralCacheAttributes( ilca );
179        this.serializer = serializer;
180    }
181
182    /**
183     * This starts the ListenerThread on the specified port.
184     */
185    @Override
186    public synchronized void init()
187    {
188        try
189        {
190            final int port = getTcpLateralCacheAttributes().getTcpListenerPort();
191            final String host = getTcpLateralCacheAttributes().getTcpListenerHost();
192
193            terminated.set(false);
194            shutdown.set(false);
195
196            final ServerSocketChannel serverSocket = ServerSocketChannel.open();
197
198            SocketAddress endPoint;
199
200            if (host != null && !host.isEmpty())
201            {
202                log.info( "Listening on {0}:{1}", host, port );
203                //Bind the SocketAddress with host and port
204                endPoint = new InetSocketAddress(host, port);
205            }
206            else
207            {
208                log.info( "Listening on port {0}", port );
209                endPoint = new InetSocketAddress(port);
210            }
211
212            serverSocket.bind(endPoint);
213            serverSocket.configureBlocking(false);
214
215            listenerThread = new Thread(() -> runListener(serverSocket),
216                    "JCS-LateralTCPListener-" + host + ":" + port);
217            listenerThread.setDaemon(true);
218            listenerThread.start();
219        }
220        catch ( final IOException ex )
221        {
222            throw new IllegalStateException( ex );
223        }
224    }
225
226    /**
227     * Let the lateral cache set a listener_id. Since there is only one listener for all the
228     * regions and every region gets registered? the id shouldn't be set if it isn't zero. If it is
229     * we assume that it is a reconnect.
230     * <p>
231     * By default, the listener id is the vmid.
232     * <p>
233     * The service should set this value. This value will never be changed by a server we connect
234     * to. It needs to be non static, for unit tests.
235     * <p>
236     * The service will use the value it sets in all send requests to the sender.
237     * <p>
238     * @param id The new listenerId value
239     * @throws IOException
240     */
241    @Override
242    public void setListenerId( final long id )
243        throws IOException
244    {
245        this.listenerId = id;
246        log.debug( "set listenerId = {0}", id );
247    }
248
249    /**
250     * Gets the listenerId attribute of the LateralCacheTCPListener object
251     * <p>
252     * @return The listenerId value
253     * @throws IOException
254     */
255    @Override
256    public long getListenerId()
257        throws IOException
258    {
259        return this.listenerId;
260    }
261
262    /**
263     * Increments the put count. Gets the cache that was injected by the lateral factory. Calls put
264     * on the cache.
265     * <p>
266     * @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handlePut(org.apache.commons.jcs3.engine.behavior.ICacheElement)
267     */
268    @Override
269    public void handlePut( final ICacheElement<K, V> element )
270        throws IOException
271    {
272        putCnt++;
273        if ( log.isInfoEnabled() && getPutCnt() % 100 == 0 )
274        {
275            log.info( "Put Count (port {0}) = {1}",
276                    () -> getTcpLateralCacheAttributes().getTcpListenerPort(),
277                    this::getPutCnt);
278        }
279
280        log.debug( "handlePut> cacheName={0}, key={1}",
281                element::getCacheName, element::getKey);
282
283        getCache( element.getCacheName() ).localUpdate( element );
284    }
285
286    /**
287     * Increments the remove count. Gets the cache that was injected by the lateral factory. Calls
288     * remove on the cache.
289     * <p>
290     * @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handleRemove(String,
291     *      Object)
292     */
293    @Override
294    public void handleRemove( final String cacheName, final K key )
295        throws IOException
296    {
297        removeCnt++;
298        if ( log.isInfoEnabled() && getRemoveCnt() % 100 == 0 )
299        {
300            log.info( "Remove Count = {0}", this::getRemoveCnt);
301        }
302
303        log.debug( "handleRemove> cacheName={0}, key={1}", cacheName, key );
304
305        getCache( cacheName ).localRemove( key );
306    }
307
308    /**
309     * Gets the cache that was injected by the lateral factory. Calls removeAll on the cache.
310     * <p>
311     * @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handleRemoveAll(String)
312     */
313    @Override
314    public void handleRemoveAll( final String cacheName )
315        throws IOException
316    {
317        log.debug( "handleRemoveAll> cacheName={0}", cacheName );
318
319        getCache( cacheName ).localRemoveAll();
320    }
321
322    /**
323     * Gets the cache that was injected by the lateral factory. Calls get on the cache.
324     * <p>
325     * @param cacheName
326     * @param key
327     * @return a ICacheElement
328     * @throws IOException
329     */
330    public ICacheElement<K, V> handleGet( final String cacheName, final K key )
331        throws IOException
332    {
333        getCnt++;
334        if ( log.isInfoEnabled() && getGetCnt() % 100 == 0 )
335        {
336            log.info( "Get Count (port {0}) = {1}",
337                    () -> getTcpLateralCacheAttributes().getTcpListenerPort(),
338                    this::getGetCnt);
339        }
340
341        log.debug( "handleGet> cacheName={0}, key={1}", cacheName, key );
342
343        return getCache( cacheName ).localGet( key );
344    }
345
346    /**
347     * Gets the cache that was injected by the lateral factory. Calls get on the cache.
348     * <p>
349     * @param cacheName the name of the cache
350     * @param pattern the matching pattern
351     * @return Map
352     * @throws IOException
353     */
354    public Map<K, ICacheElement<K, V>> handleGetMatching( final String cacheName, final String pattern )
355        throws IOException
356    {
357        getCnt++;
358        if ( log.isInfoEnabled() && getGetCnt() % 100 == 0 )
359        {
360            log.info( "GetMatching Count (port {0}) = {1}",
361                    () -> getTcpLateralCacheAttributes().getTcpListenerPort(),
362                    this::getGetCnt);
363        }
364
365        log.debug( "handleGetMatching> cacheName={0}, pattern={1}", cacheName, pattern );
366
367        return getCache( cacheName ).localGetMatching( pattern );
368    }
369
370    /**
371     * Gets the cache that was injected by the lateral factory. Calls getKeySet on the cache.
372     * <p>
373     * @param cacheName the name of the cache
374     * @return a set of keys
375     * @throws IOException
376     */
377    public Set<K> handleGetKeySet( final String cacheName ) throws IOException
378    {
379        return getCache( cacheName ).getKeySet(true);
380    }
381
382    /**
383     * This marks this instance as terminated.
384     * <p>
385     * @see org.apache.commons.jcs3.engine.behavior.ICacheListener#handleDispose(String)
386     */
387    @Override
388    public void handleDispose( final String cacheName )
389        throws IOException
390    {
391        log.info( "handleDispose > cacheName={0} | Ignoring message. "
392                + "Do not dispose from remote.", cacheName );
393
394        // TODO handle active deregistration, rather than passive detection
395        dispose();
396    }
397
398    @Override
399    public synchronized void dispose()
400    {
401        if (terminated.compareAndSet(false, true))
402        {
403            notify();
404            listenerThread.interrupt();
405        }
406    }
407
408    /**
409     * Gets the cacheManager attribute of the LateralCacheTCPListener object.
410     * <p>
411     * Normally this is set by the factory. If it wasn't set the listener defaults to the expected
412     * singleton behavior of the cache manager.
413     * <p>
414     * @param name
415     * @return CompositeCache
416     */
417    protected CompositeCache<K, V> getCache( final String name )
418    {
419        return getCacheManager().getCache( name );
420    }
421
422    /**
423     * This is roughly the number of updates the lateral has received.
424     * <p>
425     * @return Returns the putCnt.
426     */
427    public int getPutCnt()
428    {
429        return putCnt;
430    }
431
432    /**
433     * @return Returns the getCnt.
434     */
435    public int getGetCnt()
436    {
437        return getCnt;
438    }
439
440    /**
441     * @return Returns the removeCnt.
442     */
443    public int getRemoveCnt()
444    {
445        return removeCnt;
446    }
447
448    /**
449     * @param cacheMgr The cacheMgr to set.
450     */
451    @Override
452    public void setCacheManager( final ICompositeCacheManager cacheMgr )
453    {
454        this.cacheManager = cacheMgr;
455    }
456
457    /**
458     * @return Returns the cacheMgr.
459     */
460    @Override
461    public ICompositeCacheManager getCacheManager()
462    {
463        return cacheManager;
464    }
465
466    /**
467     * @param tcpLateralCacheAttributes The tcpLateralCacheAttributes to set.
468     */
469    public void setTcpLateralCacheAttributes( final ITCPLateralCacheAttributes tcpLateralCacheAttributes )
470    {
471        this.tcpLateralCacheAttributes = tcpLateralCacheAttributes;
472    }
473
474    /**
475     * @return Returns the tcpLateralCacheAttributes.
476     */
477    public ITCPLateralCacheAttributes getTcpLateralCacheAttributes()
478    {
479        return tcpLateralCacheAttributes;
480    }
481
482    /**
483     * Processes commands from the server socket. There should be one listener for each configured
484     * TCP lateral.
485     * @deprecated No longer used
486     */
487    @Deprecated
488    public class ListenerThread
489        extends Thread
490    {
491        /** The socket listener */
492        private final ServerSocket serverSocket;
493
494        /**
495         * Constructor
496         *
497         * @param serverSocket
498         */
499        public ListenerThread(final ServerSocket serverSocket)
500        {
501            this.serverSocket = serverSocket;
502        }
503
504        /** Main processing method for the ListenerThread object */
505        @Override
506        public void run()
507        {
508            runListener(serverSocket.getChannel());
509        }
510    }
511
512    /**
513     * Processes commands from the server socket. There should be one listener for each configured
514     * TCP lateral.
515     */
516    private void runListener(final ServerSocketChannel serverSocket)
517    {
518        try (Selector selector = Selector.open())
519        {
520            serverSocket.register(selector, SelectionKey.OP_ACCEPT);
521            log.debug("Waiting for clients to connect");
522
523            // Check to see if we've been asked to exit, and exit
524            while (!terminated.get())
525            {
526                int activeKeys = selector.select(acceptTimeOut);
527                if (activeKeys == 0)
528                {
529                    continue;
530                }
531
532                for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();)
533                {
534                    if (terminated.get())
535                    {
536                        break;
537                    }
538
539                    SelectionKey key = i.next();
540                    i.remove();
541
542                    if (!key.isValid())
543                    {
544                        continue;
545                    }
546
547                    if (key.isAcceptable())
548                    {
549                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
550                        SocketChannel client = server.accept();
551                        if (client == null)
552                        {
553                            //may happen in non-blocking mode
554                            continue;
555                        }
556
557                        log.info("Connected to client at {0}", client.getRemoteAddress());
558
559                        client.configureBlocking(false);
560                        client.register(selector, SelectionKey.OP_READ);
561                    }
562
563                    if (key.isReadable())
564                    {
565                        handleClient(key);
566                    }
567                }
568            }
569
570            log.debug("Thread terminated, exiting gracefully");
571
572            //close all registered channels
573            selector.keys().forEach(key -> {
574                try
575                {
576                    key.channel().close();
577                }
578                catch (IOException e)
579                {
580                    log.warn("Problem closing channel", e);
581                }
582            });
583        }
584        catch (final IOException e)
585        {
586            log.error( "Exception caught in TCP listener", e );
587        }
588        finally
589        {
590            try
591            {
592                serverSocket.close();
593            }
594            catch (IOException e)
595            {
596                log.error( "Exception closing TCP listener", e );
597            }
598        }
599    }
600
601    /**
602     * A Separate thread that runs when a command comes into the LateralTCPReceiver.
603     * @deprecated No longer used
604     */
605    @Deprecated
606    public class ConnectionHandler
607        implements Runnable
608    {
609        /** The socket connection, passed in via constructor */
610        private final Socket socket;
611
612        /**
613         * Construct for a given socket
614         * @param socket
615         */
616        public ConnectionHandler( final Socket socket )
617        {
618            this.socket = socket;
619        }
620
621        /**
622         * Main processing method for the LateralTCPReceiverConnection object
623         */
624        @Override
625        public void run()
626        {
627            try (InputStream is = socket.getInputStream())
628            {
629                while ( true )
630                {
631                    final LateralElementDescriptor<K, V> led =
632                            serializer.deSerializeFrom(is, null);
633
634                    if ( led == null )
635                    {
636                        log.debug( "LateralElementDescriptor is null" );
637                        continue;
638                    }
639                    if ( led.getRequesterId() == getListenerId() )
640                    {
641                        log.debug( "from self" );
642                    }
643                    else
644                    {
645                        log.debug( "receiving LateralElementDescriptor from another led = {0}",
646                                led );
647
648                        Object obj = handleElement(led);
649                        if (obj != null)
650                        {
651                            OutputStream os = socket.getOutputStream();
652                            serializer.serializeTo(obj, os);
653                            os.flush();
654                        }
655                    }
656                }
657            }
658            catch (final IOException e)
659            {
660                log.info("Caught {0}, closing connection.", e.getClass().getSimpleName(), e);
661            }
662            catch (final ClassNotFoundException e)
663            {
664                log.error( "Deserialization failed reading from socket", e );
665            }
666        }
667    }
668
669    /**
670     * A Separate thread that runs when a command comes into the LateralTCPReceiver.
671     */
672    private void handleClient(final SelectionKey key)
673    {
674        final SocketChannel socketChannel = (SocketChannel) key.channel();
675
676        try
677        {
678            final LateralElementDescriptor<K, V> led =
679                    serializer.deSerializeFrom(socketChannel, null);
680
681            if ( led == null )
682            {
683                log.debug("LateralElementDescriptor is null");
684                return;
685            }
686
687            if ( led.getRequesterId() == getListenerId() )
688            {
689                log.debug( "from self" );
690            }
691            else
692            {
693                log.debug( "receiving LateralElementDescriptor from another led = {0}",
694                        led );
695
696                Object obj = handleElement(led);
697                if (obj != null)
698                {
699                    serializer.serializeTo(obj, socketChannel);
700                }
701            }
702        }
703        catch (final IOException e)
704        {
705            log.info("Caught {0}, closing connection.", e.getClass().getSimpleName(), e);
706            try
707            {
708                socketChannel.close();
709            }
710            catch (IOException e1)
711            {
712                log.error("Error while closing connection", e );
713            }
714        }
715        catch (final ClassNotFoundException e)
716        {
717            log.error( "Deserialization failed reading from socket", e );
718        }
719    }
720
721    /**
722     * This calls the appropriate method, based on the command sent in the Lateral element
723     * descriptor.
724     * <p>
725     * @param led the lateral element
726     * @return a possible response
727     * @throws IOException
728     */
729    private Object handleElement(final LateralElementDescriptor<K, V> led) throws IOException
730    {
731        final String cacheName = led.getPayload().getCacheName();
732        final K key = led.getPayload().getKey();
733        Object obj = null;
734
735        switch (led.getCommand())
736        {
737            case UPDATE:
738                handlePut(led.getPayload());
739                break;
740
741            case REMOVE:
742                // if a hash code was given and filtering is on
743                // check to see if they are the same
744                // if so, then don't remove, otherwise issue a remove
745                if (led.getValHashCode() != -1 &&
746                    getTcpLateralCacheAttributes().isFilterRemoveByHashCode())
747                {
748                    final ICacheElement<K, V> test = getCache( cacheName ).localGet( key );
749                    if ( test != null )
750                    {
751                        if ( test.getVal().hashCode() == led.getValHashCode() )
752                        {
753                            log.debug( "Filtering detected identical hashCode [{0}], "
754                                    + "not issuing a remove for led {1}",
755                                    led.getValHashCode(), led );
756                            return null;
757                        }
758                        log.debug( "Different hash codes, in cache [{0}] sent [{1}]",
759                                test.getVal()::hashCode, led::getValHashCode );
760                    }
761                }
762                handleRemove( cacheName, key );
763                break;
764
765            case REMOVEALL:
766                handleRemoveAll( cacheName );
767                break;
768
769            case GET:
770                obj = handleGet( cacheName, key );
771                break;
772
773            case GET_MATCHING:
774                obj = handleGetMatching( cacheName, (String) key );
775                break;
776
777            case GET_KEYSET:
778                obj = handleGetKeySet(cacheName);
779                break;
780
781            default: break;
782        }
783
784        return obj;
785    }
786
787    /**
788     * Shuts down the receiver.
789     */
790    @Override
791    public void shutdown()
792    {
793        if ( shutdown.compareAndSet(false, true) )
794        {
795            log.info( "Shutting down TCP Lateral receiver." );
796            dispose();
797        }
798        else
799        {
800            log.debug( "Shutdown already called." );
801        }
802    }
803}