001// License: GPL. For details, see LICENSE file. 002package org.openstreetmap.josm.data.cache; 003 004import java.io.IOException; 005import java.net.URL; 006import java.util.Iterator; 007import java.util.Map; 008import java.util.concurrent.ConcurrentHashMap; 009import java.util.concurrent.LinkedBlockingDeque; 010import java.util.concurrent.Semaphore; 011import java.util.concurrent.TimeUnit; 012 013import org.openstreetmap.josm.Main; 014 015/** 016 * @author Wiktor Niesiobędzki 017 * 018 * Queue for ThreadPoolExecutor that implements per-host limit. It will acquire a semaphore for each task 019 * and it will set a runnable task with semaphore release, when job has finished. 020 * 021 * This implementation doesn't guarantee to have at most hostLimit connections per host[1], and it doesn't 022 * guarantee that all threads will be busy, when there is work for them[2]. 023 * 024 * [1] More connection per host may happen, when ThreadPoolExecutor is growing its pool, and thus 025 * tasks do not go through the Queue 026 * [2] If we have a queue, and for all hosts in queue we will fail to acquire semaphore, the thread 027 * take the first available job and wait for semaphore. It might be the case, that semaphore was released 028 * for some task further in queue, but this implementation doesn't try to detect such situation 029 * 030 * 031 */ 032public class HostLimitQueue extends LinkedBlockingDeque<Runnable> { 033 private static final long serialVersionUID = 1L; 034 035 private final Map<String, Semaphore> hostSemaphores = new ConcurrentHashMap<>(); 036 private final int hostLimit; 037 038 /** 039 * Creates an unbounded queue 040 * @param hostLimit how many parallel calls to host to allow 041 */ 042 public HostLimitQueue(int hostLimit) { 043 super(); // create unbounded queue 044 this.hostLimit = hostLimit; 045 } 046 047 private JCSCachedTileLoaderJob<?, ?> findJob() { 048 for (Iterator<Runnable> it = iterator(); it.hasNext();) { 049 Runnable r = it.next(); 050 if (r instanceof JCSCachedTileLoaderJob) { 051 JCSCachedTileLoaderJob<?, ?> job = (JCSCachedTileLoaderJob<?, ?>) r; 052 if (tryAcquireSemaphore(job)) { 053 if (remove(job)) { 054 return job; 055 } else { 056 // we have acquired the semaphore, but we didn't manage to remove job, as someone else did 057 // release the semaphore and look for another candidate 058 releaseSemaphore(job); 059 } 060 } else { 061 URL url = null; 062 try { 063 url = job.getUrl(); 064 } catch (IOException e) { 065 if (Main.isDebugEnabled()) { 066 Main.debug(e.getMessage()); 067 } 068 } 069 Main.debug("TMS - Skipping job {0} because host limit reached", url); 070 } 071 } 072 } 073 return null; 074 } 075 076 @Override 077 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { 078 Runnable job = findJob(); 079 if (job != null) { 080 return job; 081 } 082 job = pollFirst(timeout, unit); 083 if (job != null) { 084 acquireSemaphore(job); 085 } 086 return job; 087 } 088 089 @Override 090 public Runnable take() throws InterruptedException { 091 Runnable job = findJob(); 092 if (job != null) { 093 return job; 094 } 095 job = takeFirst(); 096 if (job != null) { 097 acquireSemaphore(job); 098 } 099 return job; 100 } 101 102 private Semaphore getSemaphore(JCSCachedTileLoaderJob<?, ?> job) { 103 String host; 104 try { 105 host = job.getUrl().getHost(); 106 } catch (IOException e) { 107 // do not pass me illegal URL's 108 throw new IllegalArgumentException(e); 109 } 110 Semaphore limit = hostSemaphores.get(host); 111 if (limit == null) { 112 synchronized (hostSemaphores) { 113 limit = hostSemaphores.get(host); 114 if (limit == null) { 115 limit = new Semaphore(hostLimit); 116 hostSemaphores.put(host, limit); 117 } 118 } 119 } 120 return limit; 121 } 122 123 private void acquireSemaphore(Runnable job) throws InterruptedException { 124 if (job instanceof JCSCachedTileLoaderJob) { 125 final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job; 126 Semaphore limit = getSemaphore(jcsJob); 127 if (limit != null) { 128 limit.acquire(); 129 jcsJob.setFinishedTask(new Runnable() { 130 @Override 131 public void run() { 132 releaseSemaphore(jcsJob); 133 } 134 }); 135 } 136 } 137 } 138 139 private boolean tryAcquireSemaphore(final JCSCachedTileLoaderJob<?, ?> job) { 140 boolean ret = true; 141 Semaphore limit = getSemaphore(job); 142 if (limit != null) { 143 ret = limit.tryAcquire(); 144 if (ret) { 145 job.setFinishedTask(new Runnable() { 146 @Override 147 public void run() { 148 releaseSemaphore(job); 149 } 150 }); 151 } 152 } 153 return ret; 154 } 155 156 private void releaseSemaphore(JCSCachedTileLoaderJob<?, ?> job) { 157 Semaphore limit = getSemaphore(job); 158 if (limit != null) { 159 limit.release(); 160 if (limit.availablePermits() > hostLimit) { 161 Main.warn("More permits than it should be"); 162 } 163 } 164 } 165}