Tuesday 31 December 2013

Semaphore In Multithreading



A view on Semaphore


Question: - What is Semaphore?

A new class named Semaphore is introduced with jdk 1.5 and it belongs to the package java.util.concurrent. This can be used in multithreaded programming. It is having a set of permits for a resource which is required by a Thread. Semaphore can be treated as a counter which is having
specified number of pass or permits. In pursuance of access a shared resource, Current Executing Thread must acquire a permit. If permit is already taken by other thread than current thread has to wait until a permit is available due to release of permit from different thread. This concurrency utility is very helpful and more effective in order to implement Thread Pool, database Connection pool and producer consumer design pattern etc.

A Semaphore can be created with two initialization parameters or only single parameter(permits) which are as follows:
  • Number of permits: - The number of permits controls the number of resources that we want to manage using the semaphore.
  • Fairness: - As we are dealing with multiple threads, and the process switches CPU context between threads, so there is a concept of fairness.
As per the theory a thread could attempt to acquire a permit and blocked due to unavailability of resource. Then a second thread could attempt to acquire a permit immediately when it is returned, before the first thread has time to wake up and acquire it, and take that permit from the waiting thread. This process could theoretically repeat infinitely, leaving that first thread blocked forever. In this situation fairness policy works, if we tell the Semaphore to be fair,
it maintains the order that threads arrived and requested a permit and serves permits in that order.

After the creation of Semaphore object, we can manage permits through the acquire() and release() methods. The acquire() method obtains one or more permits from the Semaphore and release() method releases one or more permits.

Question: - Where to use Semaphore?

Suppose we want to implement better DB connection pool and if all connections are engaged and no more connection is available then instead of throwing an error it should wait for connection availability.

Important facts about Semaphore

Semaphore class allows many overloaded version of tryAquire() method which acquires a permit, if one is available and returns immediately, with the value true, reducing the number of available permits by one. If no permit is available then this method will return immediately with the value false.

Second very useful method of Semaphore is acquireUninterruptibly() which acquires a permit, if one is available and returns immediately, reducing the number of available permits by one. If no permit is available then the current thread becomes disabled for thread scheduling purposes and lies dormant until some other thread invokes the release() method for this semaphore and the current thread is next to be assigned a permit.

Below program will define the use of semaphore and their available methods
 

package com.gaurav.corejava.multithreading;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;

class CustomerNames {
            private final Semaphore lock = new Semaphore(4, true);
            private final Map<String, Boolean> names = new ConcurrentHashMap<String, Boolean>();

            public CustomerNames() {
                        names.put("Gaurav", false);
                        names.put("Kumar", false);
                        names.put("Aryan", false);
                        names.put("Shivam", false);
            }

            public void putName(String string) {
                        if (insertedNameInMap(string)) {
                                    lock.release();
                        }
            }

            public String getName() {
                        try {
                                    lock.acquire();
                                    return retrieveNames();
                        } catch (Exception e) {
                                    e.printStackTrace();
                        }
                        return null;
            }

            protected synchronized String retrieveNames() {
                        for (String nameString : names.keySet()) {
                                    Boolean extractedName = names.get(nameString);
                                    if (!extractedName.booleanValue()) {
                                                names.put(nameString, true);
                                                return nameString;
                                    }
                        }
                        return null;
            }

            protected synchronized boolean insertedNameInMap(String name) {
                        for (String nameStr : names.keySet()) {
                                    if (nameStr.equalsIgnoreCase(name)) {
                                                names.put(nameStr, false);
                                                return true;
                                    }
                        }
                        return false;
            }
}

class Worker extends Thread {
            private final int counter;
            private final CustomerNames custNames;

            public Worker(CustomerNames server, int number) {
                        this.custNames = server;
                        this.counter = number;
            }

            @Override
            public void run() {
                        String name = custNames.getName();
                        System.out.println("Acquired Customer Name is : " + name
                + " by Thread " + counter);
                        try {
                                    Thread.sleep(3000);
                        } catch (Exception e) {
                                    e.printStackTrace();
                        }
                        custNames.putName(name);
                        System.out.println("Released Customer Name is : " + name
                + " by Thread " + counter);
            }
}

/**
 * @author gaurav
 *
 */
public class SemaphoreDemonstrationTest {
            public static void main(String[] args) {
                        CustomerNames custNames = new CustomerNames();
                        for (int i = 0; i < 4; i++) {
                                    Worker worker = new Worker(custNames, i);
                                    worker.start();
                        }
            }
}

Description of the above program:-

The CustomerNames class manages a set of four resources which are customer names as Strings. It stores its name Strings in a map that maps the String with boolean denoting whether or not the name String is currently retrieved. It creates a Semaphore, named lock, that maintains 4 permits and is fair (preserves the order of each thread that requests a resource.) When we want to obtain a name from the CustomerNames class, we have to call the getName() method. This method will first acquires a permit for a name by calling the Semaphore's acquire() method and then calls the internal synchronized method retrieveNames() that finds an available name by iterating over the map.

When we are finished with all the name, then we have to return it back to the CustomerNames class by calling the putName() method. The putName() method releases the name, permits and then checks it back into the CustomerNames class.

The Worker class is a thread class that acquires a name from the CustomerNames class, sleeps for 3 seconds, and then returns the same name to CustomerNames.

Finally, the SemaphoreDemonstrationTest class creates 4 Worker threads that will try each attempt to acquire one of the four managed name Strings. Below is the output of this example.

Result:-

Acquired Customer Name is : Aryan by Thread 0
Acquired Customer Name is : Gaurav by Thread 1
Acquired Customer Name is : Shivam by Thread 2
Acquired Customer Name is : Kumar by Thread 3
Released Customer Name is : Aryan by Thread 0
Released Customer Name is : Shivam by Thread 2
Released Customer Name is : Gaurav by Thread 1
Released Customer Name is : Kumar by Thread 3

Thursday 12 December 2013

How ConcurrentHashMap works internally in Java?

Internal process of ConcurrentHashMap 

Question : How did ConcurrentHashMap achieve it's scalability and improved performance over synchronized HashMap and Hashtable? How it achieves it's thread-safety?

Answer : 



java.lang.Object
·      java.util.AbstractMap<K,V>
·      java.util.concurrent.ConcurrentHashMap<K,V>
         Type Parameters:
K - the type of keys maintained by this map
V - the type of mapped values
            All Implemented Interfaces:
   Serializable, ConcurrentMap<K,V>, Map<K,V>


           public ConcurrentHashMap() :- Creates a new, empty map with a default initial capacity (16), load factor (0.75) and concurrencyLevel (16).




            public ConcurrentHashMap(int initialCapacity,

                 float loadFactor,

                 int concurrencyLevel):-  Creates a new, empty map with the specified initial capacity, load factor and concurrency level.
Parameters:

initialCapacity - the initial capacity. The implementation performs internal sizing to accommodate this many elements.

loadFactor - the load factor threshold, used to control resizing. Resizing may be performed when the average number of elements per bin exceeds this threshold.

concurrencyLevel - the estimated number of concurrently updating threads. The implementation performs internal sizing to try to accommodate this many threads.

Throws:
IllegalArgumentException - if the initial capacity is negative or the load factor or concurrency Level are non positive.


The maximum size it can go up to is 2 power 16. Higher the concurrency level, higher would be the no. of segments and lesser the size of hash table that the segment manages. Using a significantly higher value than we will waste space and time, and a significantly lower value can lead to thread contention/competition.

Now a days ConcurrentHashMap is one of the most popular concurrent collection class in Java.
Its an alternative to Hashtable or synchronized  HashMap in java. 

A ConcurrentHashMap is divided into multiple number of segments and the default is16 on initialization. Similarly ConcurrentHashMap allows 16 threads to access these segments concurrently so that each thread work on a specific segment during high concurrency. 

ConcurrentHashMap offered all the better features as compare to Hashtable and ConcurrentHashMap’s accomplish this by using a simple mechanism i.e. instead of a keeping a wide lock on map, the collection maintains a list of 16 locks by default, each of which is used to guard (or lock on) a single bucket of the map. So, this effectively means that 16 threads can modify the collection at a single time as long as they’re all working on different buckets.

So this hash table is supporting full concurrency of retrievals with resizable array of hash buckets, each consisting of List of HashEntry elements. Instead of a single collection lock, ConcurrentHashMap uses a fixed pool of locks that form a partition over the collection of buckets. This class follows the same functional specification as Hashtable, and includes versions of methods corresponding to each method available in Hashtable

However, even though all operations are thread-safe, retrieval operations do not entail locking, and there is not any support for locking the entire table in a way that prevents all access.  

                                                                                    There is a static final inner class named HashEntry defined inside  ConcurrentHashMap.  The code snippet is defined below:-


static final class HashEntry<K,V> {
        final K key;
        final int hash;
        volatile V value;
        final HashEntry<K,V> next;

        HashEntry(K key, int hash, HashEntry<K,V> next, V value) {
            this.key = key;
            this.hash = hash;
            this.next = next;
            this.value = value;
        }


This(HashEntry) class takes advantage of final and volatile variables to reflect the changes to other threads running parallel without acquiring the expensive lock for read operations. 

Synchronizing the whole map fails to take advantage of a possible optimisation because hash maps store their data in a series of separate buckets, and to lock only the portion of the map that is being accessed will give adavantage. This optimization is generally called lock striping

The basic strategy is to subdivide the table among multiple segments so that the lock is applied only on a segment rather than the entire table. Each segment manages its own internal hash table in size 2x>=(capacity/no. of segments). 

Locking is applied only for updatation. In case of of retrievals, it allows full concurrency, retrievals reflect the results of the most recently completed update operations. Combination of lock striping and prudent use of volatile variables is making ConcurrentHashMap, an excellent candidate to be used in multithreading environment.
           
Two main features of  ConcurrentHashMap is:
                       
  •   While writing on  ConcurrentHashMap, locks only a part of the map
  •   Reads can generally happened without locking.

An another static final inner class named Segment is defined in  ConcurrentHashMap. The key-value pair table of ConcurrentHashMap, is divided among Segments which extends Reentrant Lock and implements Serializable, each of which(Segments) itself is a concurrently readable hash table. Each segment uses single lock to consistently update its elements flushing all the changes to main memory.
                         
                      ConcurrentHashMap doesn’t allow NULL values. The key can’t be NULL, it is used to locate the segment index and then the actual bucket within the segment. The Hash value of ConcurrentHashMap is used to locate both segment and hash table index. The upper bits will be used to locate the segment index and the lower bits to locate the table index within the segment.
           
           Put If Absent : This method of ConcurrentHashMap is similar to put, except that the value will not be overridden if key already exists. This method is bit faster than the below piece of code as it avoids double traversing. It goes through the same internal flow as put, avoids overriding the old value if key already exists and simply returns the old value.

if (!map.containsKey(key))   
       return map.put(key, value);
else   
       return map.get(key);

This above code is equivalent to putIfAbsent except that the action is performed atomically. As containsKey is not synchronized so the above code may cause unexpected behaviour in multithreaded environment.
                
              put() method of ConcurrentHashMap holds the bucket lock for the duration of its execution and doesn't necessarily block other threads from calling get() operations on the map. First, it will search the appropriate hash value for the given key and if found, then it simply updates the volatile value field. Otherwise it creates a new HashEntry object and inserts it at the head of the list. Iterator returned by  ConcurrentHashMap is fail-safe.



Question : What is the difference between fail-fast iterator and fail-safe iterator?

Answer : fail-fast :  As the name implies, fail-fast iterator fail as soon as they believe that  structure of the collection has been changed since iteration has started. Change in structure means insertion, deletion and updation of any element from Collection, while one thread is iterating over that collection. This(fail-fast) behavior is implemented by keeping a modification count and if iteration thread realizes the change in modification count it throws ConcurrentModificationException.

              fail-safe : If Collection is modified structurally while one thread is Iterating over it then fail-safe iterator doesn't throw any Exception because they work on clone of Collection instead of original collection and the reason they are known as fail-safe iterator. Iterator of CopyOnWriteArrayList, CopyOnWriteArraySet and ConcurrentHashMap are fail-safe and never throw ConcurrentModificationException in Java.

Question : Is this possible for two threads to update the ConcurrentHashMap simultaneously?

Answer : Yes, its possible to have two parallel threads writing to the ConcurrentHashMap at the same time. As per the default implementation of ConcurrentHashMap, atmost 16 threads can write & read in parallel. But the worst case is, if the two objects lie in the same segment or partition of ConcurrentHashMap, then parallel write would not be possible.

Question : Can multiple threads read from a given Hashtable concurrently ?

Answer : No, get() method of hash table is synchronized (even for synchronized HashMap). So only one thread can get value from it at any given point of time. Full concurrency for reads is possible only in ConcurrentHashMap via the use of volatile.


ConcurrentHashMap example:- 



package com.gaurav.collection;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class ConcurrentHashMapTest {

      public static void main(String[] args) {

            // ConcurrentHashMap implementation details
            Map<String, String> concurrentHMap = new ConcurrentHashMap<String, String>();
            concurrentHMap.put("FIRST", "1");
            concurrentHMap.put("SECOND", "2");
            concurrentHMap.put("THIRD", "3");
            concurrentHMap.put("FOURTH", "4");
            concurrentHMap.put("FIFTH", "5");
            concurrentHMap.put("SIXTH", "6");
            System.out.println("ConcurrentHashMap before iteration : " + concurrentHMap);
            Iterator<String> ccHMapIterator = concurrentHMap.keySet().iterator();

            while (ccHMapIterator.hasNext()) {
                  String key = ccHMapIterator.next();
                  if (key.equals("FOURTH"))
                        concurrentHMap.put(key + "-GAURAV", "I AM NEW KEY IN CCHMAP");
            }
            System.out.println("ConcurrentHashMap after iteration : " + concurrentHMap);

            System.out.println("**************************************");
            // HashMap implementation details
            Map<String, String> hMap = new HashMap<String, String>();
            hMap.put("FIRST", "1");
            hMap.put("SECOND", "2");
            hMap.put("THIRD", "3");
            hMap.put("FOURTH", "4");
            hMap.put("FIFTH", "5");
            hMap.put("SIXTH", "6");
            System.out.println("HashMap before iteration : " + hMap);
            Iterator<String> hashMapIterator = hMap.keySet().iterator();

            while (hashMapIterator.hasNext()) {
                  String key = hashMapIterator.next();
                  if (key.equals("FOURTH"))
                        hMap.put(key + "-SHIVAM", "I AM NEW KEY IN HMAP");
            }
            System.out.println("HashMap after iteration : " + hMap);
      }
}

Result:-



ConcurrentHashMap before iteration : {THIRD=3, FIRST=1, SIXTH=6, FOURTH=4, FIFTH=5, SECOND=2}
ConcurrentHashMap after iteration : {THIRD=3, FIRST=1, SIXTH=6, FOURTH-GAURAV=I AM NEW KEY IN CCHMAP, FOURTH=4, FIFTH=5, SECOND=2}
**************************************
HashMap before iteration : {FIFTH=5, THIRD=3, FOURTH=4, SECOND=2, SIXTH=6, FIRST=1}
Exception in thread "main" java.util.ConcurrentModificationException
      at java.util.HashMap$HashIterator.nextEntry(HashMap.java:793)
      at java.util.HashMap$KeyIterator.next(HashMap.java:828)
      at com.gaurav.collection.ConcurrentHashMapTest.main(ConcurrentHashMapTest.java:43)




ConcurrentHashMap advantages summary
  •  ConcurrentHashMap allows concurrent read and thread-safe update operation. All operations of ConcurrentHashMap are thread-safe.

  •  With update operation, ConcurrentHashMap only lock a specific part of Map instead of whole Map.
  • Concurrent update is achieved by internally dividing Map into small segments which is defined using concurrency level.
  • Iterator returned by ConcurrentHashMap is fail safe and never throw ConcurrentModificationException. 

  • ConcurrentHashMap doesn’t allow null as key or value.