001// License: GPL. For details, see LICENSE file. 002package org.openstreetmap.josm.data.cache; 003 004import java.io.IOException; 005import java.net.URL; 006import java.util.Iterator; 007import java.util.Map; 008import java.util.concurrent.ConcurrentHashMap; 009import java.util.concurrent.LinkedBlockingDeque; 010import java.util.concurrent.Semaphore; 011import java.util.concurrent.TimeUnit; 012 013import org.openstreetmap.josm.Main; 014 015/** 016 * Queue for ThreadPoolExecutor that implements per-host limit. It will acquire a semaphore for each task 017 * and it will set a runnable task with semaphore release, when job has finished. 018 * <p> 019 * This implementation doesn't guarantee to have at most hostLimit connections per host[1], and it doesn't 020 * guarantee that all threads will be busy, when there is work for them[2]. <br> 021 * [1] More connection per host may happen, when ThreadPoolExecutor is growing its pool, and thus 022 * tasks do not go through the Queue <br> 023 * [2] If we have a queue, and for all hosts in queue we will fail to acquire semaphore, the thread 024 * take the first available job and wait for semaphore. It might be the case, that semaphore was released 025 * for some task further in queue, but this implementation doesn't try to detect such situation 026 * 027 * @author Wiktor Niesiobędzki 028 */ 029public class HostLimitQueue extends LinkedBlockingDeque<Runnable> { 030 private static final long serialVersionUID = 1L; 031 032 private final Map<String, Semaphore> hostSemaphores = new ConcurrentHashMap<>(); 033 private final int hostLimit; 034 035 /** 036 * Creates an unbounded queue 037 * @param hostLimit how many parallel calls to host to allow 038 */ 039 public HostLimitQueue(int hostLimit) { 040 super(); // create unbounded queue 041 this.hostLimit = hostLimit; 042 } 043 044 private JCSCachedTileLoaderJob<?, ?> findJob() { 045 for (Iterator<Runnable> it = iterator(); it.hasNext();) { 046 Runnable r = it.next(); 047 if (r instanceof JCSCachedTileLoaderJob) { 048 JCSCachedTileLoaderJob<?, ?> job = (JCSCachedTileLoaderJob<?, ?>) r; 049 if (tryAcquireSemaphore(job)) { 050 if (remove(job)) { 051 return job; 052 } else { 053 // we have acquired the semaphore, but we didn't manage to remove job, as someone else did 054 // release the semaphore and look for another candidate 055 releaseSemaphore(job); 056 } 057 } else { 058 URL url = null; 059 try { 060 url = job.getUrl(); 061 } catch (IOException e) { 062 Main.debug(e); 063 } 064 Main.debug("TMS - Skipping job {0} because host limit reached", url); 065 } 066 } 067 } 068 return null; 069 } 070 071 @Override 072 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { 073 Runnable job = findJob(); 074 if (job != null) { 075 return job; 076 } 077 job = pollFirst(timeout, unit); 078 if (job != null) { 079 acquireSemaphore(job); 080 } 081 return job; 082 } 083 084 @Override 085 public Runnable take() throws InterruptedException { 086 Runnable job = findJob(); 087 if (job != null) { 088 return job; 089 } 090 job = takeFirst(); 091 if (job != null) { 092 acquireSemaphore(job); 093 } 094 return job; 095 } 096 097 private Semaphore getSemaphore(JCSCachedTileLoaderJob<?, ?> job) { 098 String host; 099 try { 100 host = job.getUrl().getHost(); 101 } catch (IOException e) { 102 // do not pass me illegal URL's 103 throw new IllegalArgumentException(e); 104 } 105 Semaphore limit = hostSemaphores.get(host); 106 if (limit == null) { 107 synchronized (hostSemaphores) { 108 limit = hostSemaphores.get(host); 109 if (limit == null) { 110 limit = new Semaphore(hostLimit); 111 hostSemaphores.put(host, limit); 112 } 113 } 114 } 115 return limit; 116 } 117 118 private void acquireSemaphore(Runnable job) throws InterruptedException { 119 if (job instanceof JCSCachedTileLoaderJob) { 120 final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job; 121 Semaphore limit = getSemaphore(jcsJob); 122 if (limit != null) { 123 limit.acquire(); 124 jcsJob.setFinishedTask(() -> releaseSemaphore(jcsJob)); 125 } 126 } 127 } 128 129 private boolean tryAcquireSemaphore(final JCSCachedTileLoaderJob<?, ?> job) { 130 boolean ret = true; 131 Semaphore limit = getSemaphore(job); 132 if (limit != null) { 133 ret = limit.tryAcquire(); 134 if (ret) { 135 job.setFinishedTask(() -> releaseSemaphore(job)); 136 } 137 } 138 return ret; 139 } 140 141 private void releaseSemaphore(JCSCachedTileLoaderJob<?, ?> job) { 142 Semaphore limit = getSemaphore(job); 143 if (limit != null) { 144 limit.release(); 145 if (limit.availablePermits() > hostLimit) { 146 Main.warn("More permits than it should be"); 147 } 148 } 149 } 150}