001// License: GPL. For details, see Readme.txt file.
002package org.openstreetmap.gui.jmapviewer;
003
004import java.util.concurrent.BlockingDeque;
005import java.util.concurrent.LinkedBlockingDeque;
006import java.util.concurrent.TimeUnit;
007
008import org.openstreetmap.gui.jmapviewer.interfaces.TileJob;
009
010/**
011 * A generic class that processes a list of {@link Runnable} one-by-one using
012 * one or more {@link Thread}-instances. The number of instances varies between
013 * 1 and {@link #workerThreadMaxCount} (default: 8). If an instance is idle
014 * more than {@link #workerThreadTimeout} seconds (default: 30), the instance
015 * ends itself.
016 *
017 * @author Jan Peter Stotz
018 */
019public class JobDispatcher {
020
021    private static final JobDispatcher instance = new JobDispatcher();
022
023    /**
024     * @return the singelton instance of the {@link JobDispatcher}
025     */
026    public static JobDispatcher getInstance() {
027        return instance;
028    }
029
030    private JobDispatcher() {
031        addWorkerThread().firstThread = true;
032    }
033
034    protected BlockingDeque<TileJob> jobQueue = new LinkedBlockingDeque<>();
035
036    protected static int workerThreadMaxCount = 8;
037
038    /**
039     * Specifies the time span in seconds that a worker thread waits for new
040     * jobs to perform. If the time span has elapsed the worker thread
041     * terminates itself. Only the first worker thread works differently, it
042     * ignores the timeout and will never terminate itself.
043     */
044    protected static int workerThreadTimeout = 30;
045
046    /**
047     * Type of queue, FIFO if <code>false</code>, LIFO if <code>true</code>
048     */
049    protected boolean modeLIFO = false;
050
051    /**
052     * Total number of worker threads currently idle or active
053     */
054    protected int workerThreadCount = 0;
055
056    /**
057     * Number of worker threads currently idle
058     */
059    protected int workerThreadIdleCount = 0;
060
061    /**
062     * Just an id for identifying an worker thread instance
063     */
064    protected int workerThreadId = 0;
065
066    /**
067     * Removes all jobs from the queue that are currently not being processed.
068     */
069    public void cancelOutstandingJobs() {
070        jobQueue.clear();
071    }
072
073    /**
074     * Function to set the maximum number of workers for tile loading.
075     */
076    static public void setMaxWorkers(int workers) {
077        workerThreadMaxCount = workers;
078    }
079
080    /**
081     * Function to set the LIFO/FIFO mode for tile loading job.
082     *
083     * @param lifo <code>true</code> for LIFO mode, <code>false</code> for FIFO mode
084     */
085    public void setLIFO(boolean lifo) {
086        modeLIFO = lifo;
087    }
088
089    /**
090     * Adds a job to the queue.
091     * Jobs for tiles already contained in the are ignored (using a <code>null</code> tile
092     * prevents skipping).
093     *
094     * @param job the the job to be added
095     */
096    public void addJob(TileJob job) {
097        try {
098            if(job.getTile() != null) {
099                for(TileJob oldJob : jobQueue) {
100                    if(oldJob.getTile() == job.getTile()) {
101                        return;
102                    }
103                }
104            }
105            jobQueue.put(job);
106            if (workerThreadIdleCount == 0 && workerThreadCount < workerThreadMaxCount)
107                addWorkerThread();
108        } catch (InterruptedException e) {
109        }
110    }
111
112    protected JobThread addWorkerThread() {
113        JobThread jobThread = new JobThread(++workerThreadId);
114        synchronized (this) {
115            workerThreadCount++;
116        }
117        jobThread.start();
118        return jobThread;
119    }
120
121    public class JobThread extends Thread {
122
123        Runnable job;
124        boolean firstThread = false;
125
126        public JobThread(int threadId) {
127            super("OSMJobThread " + threadId);
128            setDaemon(true);
129            job = null;
130        }
131
132        @Override
133        public void run() {
134            executeJobs();
135            synchronized (instance) {
136                workerThreadCount--;
137            }
138        }
139
140        protected void executeJobs() {
141            while (!isInterrupted()) {
142                try {
143                    synchronized (instance) {
144                        workerThreadIdleCount++;
145                    }
146                    if(modeLIFO) {
147                        if (firstThread)
148                            job = jobQueue.takeLast();
149                        else
150                            job = jobQueue.pollLast(workerThreadTimeout, TimeUnit.SECONDS);
151                    } else {
152                        if (firstThread)
153                            job = jobQueue.take();
154                        else
155                            job = jobQueue.poll(workerThreadTimeout, TimeUnit.SECONDS);
156                    }
157                } catch (InterruptedException e1) {
158                    return;
159                } finally {
160                    synchronized (instance) {
161                        workerThreadIdleCount--;
162                    }
163                }
164                if (job == null)
165                    return;
166                try {
167                    job.run();
168                    job = null;
169                } catch (Exception e) {
170                    e.printStackTrace();
171                }
172            }
173        }
174    }
175
176}