001// License: GPL. For details, see LICENSE file.
002package org.openstreetmap.josm.data.cache;
003
004import java.io.IOException;
005import java.net.URL;
006import java.util.Iterator;
007import java.util.Map;
008import java.util.concurrent.ConcurrentHashMap;
009import java.util.concurrent.LinkedBlockingDeque;
010import java.util.concurrent.Semaphore;
011import java.util.concurrent.TimeUnit;
012
013import org.openstreetmap.josm.Main;
014
015/**
016 * Queue for ThreadPoolExecutor that implements per-host limit. It will acquire a semaphore for each task
017 * and it will set a runnable task with semaphore release, when job has finished.
018 * <p>
019 * This implementation doesn't guarantee to have at most hostLimit connections per host[1], and it doesn't
020 * guarantee that all threads will be busy, when there is work for them[2]. <br>
021 * [1] More connection per host may happen, when ThreadPoolExecutor is growing its pool, and thus
022 *     tasks do not go through the Queue <br>
023 * [2] If we have a queue, and for all hosts in queue we will fail to acquire semaphore, the thread
024 *     take the first available job and wait for semaphore. It might be the case, that semaphore was released
025 *     for some task further in queue, but this implementation doesn't try to detect such situation
026 *
027 * @author Wiktor Niesiobędzki
028 */
029public class HostLimitQueue extends LinkedBlockingDeque<Runnable> {
030    private static final long serialVersionUID = 1L;
031
032    private final Map<String, Semaphore> hostSemaphores = new ConcurrentHashMap<>();
033    private final int hostLimit;
034
035    /**
036     * Creates an unbounded queue
037     * @param hostLimit how many parallel calls to host to allow
038     */
039    public HostLimitQueue(int hostLimit) {
040        super(); // create unbounded queue
041        this.hostLimit = hostLimit;
042    }
043
044    private JCSCachedTileLoaderJob<?, ?> findJob() {
045        for (Iterator<Runnable> it = iterator(); it.hasNext();) {
046            Runnable r = it.next();
047            if (r instanceof JCSCachedTileLoaderJob) {
048                JCSCachedTileLoaderJob<?, ?> job = (JCSCachedTileLoaderJob<?, ?>) r;
049                if (tryAcquireSemaphore(job)) {
050                    if (remove(job)) {
051                        return job;
052                    } else {
053                        // we have acquired the semaphore, but we didn't manage to remove job, as someone else did
054                        // release the semaphore and look for another candidate
055                        releaseSemaphore(job);
056                    }
057                } else {
058                    URL url = null;
059                    try {
060                        url = job.getUrl();
061                    } catch (IOException e) {
062                        Main.debug(e);
063                    }
064                    Main.debug("TMS - Skipping job {0} because host limit reached", url);
065                }
066            }
067        }
068        return null;
069    }
070
071    @Override
072    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
073        Runnable job = findJob();
074        if (job != null) {
075            return job;
076        }
077        job = pollFirst(timeout, unit);
078        if (job != null) {
079            acquireSemaphore(job);
080        }
081        return job;
082    }
083
084    @Override
085    public Runnable take() throws InterruptedException {
086        Runnable job = findJob();
087        if (job != null) {
088            return job;
089        }
090        job = takeFirst();
091        if (job != null) {
092            acquireSemaphore(job);
093        }
094        return job;
095    }
096
097    private Semaphore getSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
098        String host;
099        try {
100            host = job.getUrl().getHost();
101        } catch (IOException e) {
102            // do not pass me illegal URL's
103            throw new IllegalArgumentException(e);
104        }
105        Semaphore limit = hostSemaphores.get(host);
106        if (limit == null) {
107            synchronized (hostSemaphores) {
108                limit = hostSemaphores.get(host);
109                if (limit == null) {
110                    limit = new Semaphore(hostLimit);
111                    hostSemaphores.put(host, limit);
112                }
113            }
114        }
115        return limit;
116    }
117
118    private void acquireSemaphore(Runnable job) throws InterruptedException {
119        if (job instanceof JCSCachedTileLoaderJob) {
120            final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job;
121            Semaphore limit = getSemaphore(jcsJob);
122            if (limit != null) {
123                limit.acquire();
124                jcsJob.setFinishedTask(() -> releaseSemaphore(jcsJob));
125            }
126        }
127    }
128
129    private boolean tryAcquireSemaphore(final JCSCachedTileLoaderJob<?, ?> job) {
130        boolean ret = true;
131        Semaphore limit = getSemaphore(job);
132        if (limit != null) {
133            ret = limit.tryAcquire();
134            if (ret) {
135                job.setFinishedTask(() -> releaseSemaphore(job));
136            }
137        }
138        return ret;
139    }
140
141    private void releaseSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
142        Semaphore limit = getSemaphore(job);
143        if (limit != null) {
144            limit.release();
145            if (limit.availablePermits() > hostLimit) {
146                Main.warn("More permits than it should be");
147            }
148        }
149    }
150}