001// License: GPL. For details, see LICENSE file.
002package org.openstreetmap.josm.data.cache;
003
004import java.io.IOException;
005import java.net.URL;
006import java.util.Iterator;
007import java.util.Map;
008import java.util.concurrent.ConcurrentHashMap;
009import java.util.concurrent.LinkedBlockingDeque;
010import java.util.concurrent.Semaphore;
011import java.util.concurrent.TimeUnit;
012
013import org.openstreetmap.josm.Main;
014
015/**
016 * @author Wiktor Niesiobędzki
017 *
018 * Queue for ThreadPoolExecutor that implements per-host limit. It will acquire a semaphore for each task
019 * and it will set a runnable task with semaphore release, when job has finished.
020 *
021 * This implementation doesn't guarantee to have at most hostLimit connections per host[1], and it doesn't
022 * guarantee that all threads will be busy, when there is work for them[2].
023 *
024 * [1] More connection per host may happen, when ThreadPoolExecutor is growing its pool, and thus
025 *     tasks do not go through the Queue
026 * [2] If we have a queue, and for all hosts in queue we will fail to acquire semaphore, the thread
027 *     take the first available job and wait for semaphore. It might be the case, that semaphore was released
028 *     for some task further in queue, but this implementation doesn't try to detect such situation
029 *
030 *
031 */
032public class HostLimitQueue extends LinkedBlockingDeque<Runnable> {
033    private static final long serialVersionUID = 1L;
034
035    private final Map<String, Semaphore> hostSemaphores = new ConcurrentHashMap<>();
036    private final int hostLimit;
037
038    /**
039     * Creates an unbounded queue
040     * @param hostLimit how many parallel calls to host to allow
041     */
042    public HostLimitQueue(int hostLimit) {
043        super(); // create unbounded queue
044        this.hostLimit = hostLimit;
045    }
046
047    private JCSCachedTileLoaderJob<?, ?> findJob() {
048        for (Iterator<Runnable> it = iterator(); it.hasNext();) {
049            Runnable r = it.next();
050            if (r instanceof JCSCachedTileLoaderJob) {
051                JCSCachedTileLoaderJob<?, ?> job = (JCSCachedTileLoaderJob<?, ?>) r;
052                if (tryAcquireSemaphore(job)) {
053                    if (remove(job)) {
054                        return job;
055                    } else {
056                        // we have acquired the semaphore, but we didn't manage to remove job, as someone else did
057                        // release the semaphore and look for another candidate
058                        releaseSemaphore(job);
059                    }
060                } else {
061                    URL url = null;
062                    try {
063                        url = job.getUrl();
064                    } catch (IOException e) {
065                        if (Main.isDebugEnabled()) {
066                            Main.debug(e.getMessage());
067                        }
068                    }
069                    Main.debug("TMS - Skipping job {0} because host limit reached", url);
070                }
071            }
072        }
073        return null;
074    }
075
076    @Override
077    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
078        Runnable job = findJob();
079        if (job != null) {
080            return job;
081        }
082        job = pollFirst(timeout, unit);
083        if (job != null) {
084            acquireSemaphore(job);
085        }
086        return job;
087    }
088
089    @Override
090    public Runnable take() throws InterruptedException {
091        Runnable job = findJob();
092        if (job != null) {
093            return job;
094        }
095        job = takeFirst();
096        if (job != null) {
097            acquireSemaphore(job);
098        }
099        return job;
100    }
101
102    private  Semaphore getSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
103        String host;
104        try {
105            host = job.getUrl().getHost();
106        } catch (IOException e) {
107            // do not pass me illegal URL's
108            throw new IllegalArgumentException(e);
109        }
110        Semaphore limit = hostSemaphores.get(host);
111        if (limit == null) {
112            synchronized (hostSemaphores) {
113                limit = hostSemaphores.get(host);
114                if (limit == null) {
115                    limit = new Semaphore(hostLimit);
116                    hostSemaphores.put(host, limit);
117                }
118            }
119        }
120        return limit;
121    }
122
123    private void acquireSemaphore(Runnable job) throws InterruptedException {
124        if (job instanceof JCSCachedTileLoaderJob) {
125            final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job;
126            Semaphore limit = getSemaphore(jcsJob);
127            if (limit != null) {
128                limit.acquire();
129                jcsJob.setFinishedTask(new Runnable() {
130                    @Override
131                    public void run() {
132                        releaseSemaphore(jcsJob);
133                    }
134                });
135            }
136        }
137    }
138
139    private boolean tryAcquireSemaphore(final JCSCachedTileLoaderJob<?, ?> job) {
140        boolean ret = true;
141        Semaphore limit = getSemaphore(job);
142        if (limit != null) {
143            ret = limit.tryAcquire();
144            if (ret) {
145                job.setFinishedTask(new Runnable() {
146                    @Override
147                    public void run() {
148                        releaseSemaphore(job);
149                    }
150                });
151            }
152        }
153        return ret;
154    }
155
156    private void releaseSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
157        Semaphore limit = getSemaphore(job);
158        if (limit != null) {
159            limit.release();
160            if (limit.availablePermits() > hostLimit) {
161                Main.warn("More permits than it should be");
162            }
163        }
164    }
165}