1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.htmlunit.javascript.background;
16
17 import java.io.IOException;
18 import java.io.ObjectInputStream;
19 import java.lang.ref.WeakReference;
20 import java.util.ArrayList;
21 import java.util.PriorityQueue;
22 import java.util.concurrent.atomic.AtomicInteger;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.htmlunit.Page;
27 import org.htmlunit.WebWindow;
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42 class JavaScriptJobManagerImpl implements JavaScriptJobManager {
43
44
45
46
47
48 private final transient WeakReference<WebWindow> window_;
49
50
51
52
53
54 private transient PriorityQueue<JavaScriptJob> scheduledJobsQ_ = new PriorityQueue<>();
55
56 private transient ArrayList<Integer> cancelledJobs_ = new ArrayList<>();
57
58 private transient JavaScriptJob currentlyRunningJob_;
59
60
61 private static final AtomicInteger NEXT_JOB_ID_ = new AtomicInteger(1);
62
63
64 private static final Log LOG = LogFactory.getLog(JavaScriptJobManagerImpl.class);
65
66
67
68
69
70
71 JavaScriptJobManagerImpl(final WebWindow window) {
72 window_ = new WeakReference<>(window);
73 }
74
75
76 @Override
77 public synchronized int getJobCount() {
78 return scheduledJobsQ_.size() + (currentlyRunningJob_ != null ? 1 : 0);
79 }
80
81
82 @Override
83 public synchronized int getJobCount(final JavaScriptJobFilter filter) {
84 if (filter == null) {
85 return scheduledJobsQ_.size() + (currentlyRunningJob_ != null ? 1 : 0);
86 }
87
88 int count = 0;
89 if (currentlyRunningJob_ != null && filter.passes(currentlyRunningJob_)) {
90 count++;
91 }
92 for (final JavaScriptJob job : scheduledJobsQ_) {
93 if (filter.passes(job)) {
94 count++;
95 }
96 }
97 return count;
98 }
99
100
101 @Override
102 public int addJob(final JavaScriptJob job, final Page page) {
103 final WebWindow w = getWindow();
104 if (w == null) {
105
106
107
108
109 return 0;
110 }
111 if (w.getEnclosedPage() != page) {
112
113
114
115
116 return 0;
117 }
118 final int id = NEXT_JOB_ID_.getAndIncrement();
119 job.setId(Integer.valueOf(id));
120
121 synchronized (this) {
122 scheduledJobsQ_.add(job);
123
124 if (LOG.isDebugEnabled()) {
125 LOG.debug("job added to queue");
126 LOG.debug(" window is: " + w);
127 LOG.debug(" added job: " + job);
128 LOG.debug("after adding job to the queue, the queue is: ");
129 printQueue();
130 }
131
132 notify();
133 }
134
135 return id;
136 }
137
138
139 @Override
140 public synchronized void removeJob(final int id) {
141 for (final JavaScriptJob job : scheduledJobsQ_) {
142 final int jobId = job.getId().intValue();
143 if (jobId == id) {
144 scheduledJobsQ_.remove(job);
145 break;
146 }
147 }
148 cancelledJobs_.add(Integer.valueOf(id));
149 notify();
150 }
151
152
153 @Override
154 public synchronized void stopJob(final int id) {
155 for (final JavaScriptJob job : scheduledJobsQ_) {
156 final int jobId = job.getId().intValue();
157 if (jobId == id) {
158 scheduledJobsQ_.remove(job);
159
160 break;
161 }
162 }
163 cancelledJobs_.add(Integer.valueOf(id));
164 notify();
165 }
166
167
168 @Override
169 public synchronized void removeAllJobs() {
170 if (currentlyRunningJob_ != null) {
171 cancelledJobs_.add(currentlyRunningJob_.getId());
172 }
173 for (final JavaScriptJob job : scheduledJobsQ_) {
174 cancelledJobs_.add(job.getId());
175 }
176 scheduledJobsQ_.clear();
177 notify();
178 }
179
180
181 @Override
182 @SuppressWarnings("PMD.GuardLogStatement")
183 public int waitForJobs(final long timeoutMillis) {
184 final boolean debug = LOG.isDebugEnabled();
185 if (debug) {
186 LOG.debug("Waiting for all jobs to finish (will wait max " + timeoutMillis + " millis).");
187 }
188 if (timeoutMillis > 0) {
189 long now = System.currentTimeMillis();
190 final long end = now + timeoutMillis;
191
192 synchronized (this) {
193 while (getJobCount() > 0 && now < end) {
194 try {
195 wait(end - now);
196 }
197 catch (final InterruptedException e) {
198 LOG.error("InterruptedException while in waitForJobs", e);
199
200
201 Thread.currentThread().interrupt();
202 }
203
204
205 now = System.currentTimeMillis();
206 }
207 }
208 }
209 final int jobs = getJobCount();
210 if (debug) {
211 LOG.debug("Finished waiting for all jobs to finish (final job count is " + jobs + ").");
212 }
213 return jobs;
214 }
215
216
217 @Override
218 public int waitForJobsStartingBefore(final long delayMillis) {
219 return waitForJobsStartingBefore(delayMillis, null);
220 }
221
222
223 @Override
224 @SuppressWarnings("PMD.GuardLogStatement")
225 public int waitForJobsStartingBefore(final long delayMillis, final JavaScriptJobFilter filter) {
226 final boolean debug = LOG.isDebugEnabled();
227
228 final long latestExecutionTime = System.currentTimeMillis() + delayMillis;
229 if (debug) {
230 LOG.debug("Waiting for all jobs that have execution time before "
231 + delayMillis + " (" + latestExecutionTime + ") to finish");
232 }
233
234 final long interval = Math.max(40, delayMillis);
235 synchronized (this) {
236 JavaScriptJob earliestJob = getEarliestJob(filter);
237 boolean pending = earliestJob != null && earliestJob.getTargetExecutionTime() < latestExecutionTime;
238 pending = pending
239 || (
240 currentlyRunningJob_ != null
241 && (filter == null || filter.passes(currentlyRunningJob_))
242 && currentlyRunningJob_.getTargetExecutionTime() < latestExecutionTime
243 );
244
245 while (pending) {
246 try {
247 wait(interval);
248 }
249 catch (final InterruptedException e) {
250 LOG.error("InterruptedException while in waitForJobsStartingBefore", e);
251
252
253 Thread.currentThread().interrupt();
254 }
255
256 earliestJob = getEarliestJob(filter);
257 pending = earliestJob != null && earliestJob.getTargetExecutionTime() < latestExecutionTime;
258 pending = pending
259 || (
260 currentlyRunningJob_ != null
261 && (filter == null || filter.passes(currentlyRunningJob_))
262 && currentlyRunningJob_.getTargetExecutionTime() < latestExecutionTime
263 );
264 }
265 }
266
267 final int jobs = getJobCount(filter);
268 if (debug) {
269 LOG.debug("Finished waiting for all jobs that have target execution time earlier than "
270 + latestExecutionTime + ", final job count is " + jobs);
271 }
272 return jobs;
273 }
274
275
276 @Override
277 public synchronized void shutdown() {
278 scheduledJobsQ_.clear();
279 notify();
280 }
281
282
283
284
285
286
287
288
289 private WebWindow getWindow() {
290 return window_.get();
291 }
292
293
294
295
296 private void printQueue() {
297 if (LOG.isDebugEnabled()) {
298 LOG.debug("------ printing JavaScript job queue -----");
299 LOG.debug(" number of jobs on the queue: " + scheduledJobsQ_.size());
300 int count = 1;
301 for (final JavaScriptJob job : scheduledJobsQ_) {
302 LOG.debug(" " + count + ") Job target execution time: " + job.getTargetExecutionTime());
303 LOG.debug(" job to string: " + job);
304 LOG.debug(" job id: " + job.getId());
305 if (job.isPeriodic()) {
306 LOG.debug(" period: " + job.getPeriod().intValue());
307 }
308 count++;
309 }
310 LOG.debug("------------------------------------------");
311 }
312 }
313
314
315
316
317 @Override
318 public synchronized String jobStatusDump(final JavaScriptJobFilter filter) {
319 final String lineSeparator = System.lineSeparator();
320
321 final StringBuilder status = new StringBuilder(110)
322 .append("------ JavaScript job status -----")
323 .append(lineSeparator);
324
325 if (null != currentlyRunningJob_ && (filter == null || filter.passes(currentlyRunningJob_))) {
326 status.append(" current running job: ").append(currentlyRunningJob_.toString())
327 .append(" job id: ").append(currentlyRunningJob_.getId())
328 .append(lineSeparator)
329 .append(lineSeparator)
330 .append(lineSeparator);
331 }
332 status.append(" number of jobs on the queue: ")
333 .append(scheduledJobsQ_.size())
334 .append(lineSeparator);
335
336 int count = 1;
337 for (final JavaScriptJob job : scheduledJobsQ_) {
338 if (filter == null || filter.passes(job)) {
339 final long now = System.currentTimeMillis();
340 final long execTime = job.getTargetExecutionTime();
341 status.append(" ").append(count).append(") Job target execution time: ")
342 .append(execTime).append(" (should start in ")
343 .append((execTime - now) / 1000d).append("s)")
344 .append(lineSeparator)
345 .append(" job to string: ").append(job)
346 .append(lineSeparator).append(" job id: ").append(job.getId())
347 .append(lineSeparator);
348 if (job.isPeriodic()) {
349 status.append(" period: ")
350 .append(job.getPeriod().toString())
351 .append(lineSeparator);
352 }
353 count++;
354 }
355 }
356 status.append("------------------------------------------")
357 .append(lineSeparator);
358
359 return status.toString();
360 }
361
362
363
364
365 @Override
366 public JavaScriptJob getEarliestJob() {
367 return scheduledJobsQ_.peek();
368 }
369
370
371
372
373 @Override
374 public synchronized JavaScriptJob getEarliestJob(final JavaScriptJobFilter filter) {
375 if (filter == null) {
376 return scheduledJobsQ_.peek();
377 }
378
379 for (final JavaScriptJob job : scheduledJobsQ_) {
380 if (filter.passes(job)) {
381 return job;
382 }
383 }
384 return null;
385 }
386
387
388
389
390 @Override
391 @SuppressWarnings("PMD.GuardLogStatement")
392 public boolean runSingleJob(final JavaScriptJob givenJob) {
393 assert givenJob != null;
394 final JavaScriptJob job = getEarliestJob();
395 if (job != givenJob) {
396 return false;
397 }
398
399 final long currentTime = System.currentTimeMillis();
400 if (job.getTargetExecutionTime() > currentTime) {
401 return false;
402 }
403 synchronized (this) {
404 if (scheduledJobsQ_.remove(job)) {
405 currentlyRunningJob_ = job;
406 }
407
408 }
409
410 final boolean debug = LOG.isDebugEnabled();
411 final boolean isPeriodicJob = job.isPeriodic();
412 if (isPeriodicJob) {
413 final long jobPeriod = job.getPeriod().longValue();
414
415
416 long timeDifference = currentTime - job.getTargetExecutionTime();
417 timeDifference = (timeDifference / jobPeriod) * jobPeriod + jobPeriod;
418 job.setTargetExecutionTime(job.getTargetExecutionTime() + timeDifference);
419
420
421 synchronized (this) {
422 if (!cancelledJobs_.contains(job.getId())) {
423 if (debug) {
424 LOG.debug("Reschedulling job " + job);
425 }
426 scheduledJobsQ_.add(job);
427 notify();
428 }
429 }
430 }
431 if (debug) {
432 final String periodicJob = isPeriodicJob ? "interval " : "";
433 LOG.debug("Starting " + periodicJob + "job " + job);
434 }
435 try {
436 job.run();
437 }
438 catch (final RuntimeException e) {
439 LOG.error("Job run failed with unexpected RuntimeException: " + e.getMessage(), e);
440 }
441 finally {
442 synchronized (this) {
443 if (job == currentlyRunningJob_) {
444 currentlyRunningJob_ = null;
445 }
446 notify();
447 }
448 }
449 if (debug) {
450 final String periodicJob = isPeriodicJob ? "interval " : "";
451 LOG.debug("Finished " + periodicJob + "job " + job);
452 }
453 return true;
454 }
455
456
457
458
459
460
461
462 private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
463 in.defaultReadObject();
464
465
466 scheduledJobsQ_ = new PriorityQueue<>();
467 cancelledJobs_ = new ArrayList<>();
468 currentlyRunningJob_ = null;
469 }
470 }