1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.htmlunit.javascript.background;
16
17 import java.io.IOException;
18 import java.io.ObjectInputStream;
19 import java.lang.ref.WeakReference;
20 import java.util.ArrayList;
21 import java.util.PriorityQueue;
22 import java.util.concurrent.atomic.AtomicInteger;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.htmlunit.Page;
27 import org.htmlunit.WebWindow;
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42 class JavaScriptJobManagerImpl implements JavaScriptJobManager {
43
44
45
46
47
48 private final transient WeakReference<WebWindow> window_;
49
50
51
52
53
54 private transient PriorityQueue<JavaScriptJob> scheduledJobsQ_ = new PriorityQueue<>();
55
56 private transient ArrayList<Integer> cancelledJobs_ = new ArrayList<>();
57
58 private transient JavaScriptJob currentlyRunningJob_;
59
60
61 private static final AtomicInteger NEXT_JOB_ID_ = new AtomicInteger(1);
62
63
64 private static final Log LOG = LogFactory.getLog(JavaScriptJobManagerImpl.class);
65
66
67
68
69
70
71 JavaScriptJobManagerImpl(final WebWindow window) {
72 window_ = new WeakReference<>(window);
73 }
74
75
76 @Override
77 public synchronized int getJobCount() {
78 return scheduledJobsQ_.size() + (currentlyRunningJob_ != null ? 1 : 0);
79 }
80
81
82 @Override
83 public synchronized int getJobCount(final JavaScriptJobFilter filter) {
84 if (filter == null) {
85 return scheduledJobsQ_.size() + (currentlyRunningJob_ != null ? 1 : 0);
86 }
87
88 int count = 0;
89 if (currentlyRunningJob_ != null && filter.passes(currentlyRunningJob_)) {
90 count++;
91 }
92 for (final JavaScriptJob job : scheduledJobsQ_) {
93 if (filter.passes(job)) {
94 count++;
95 }
96 }
97 return count;
98 }
99
100
101 @Override
102 public int addJob(final JavaScriptJob job, final Page page) {
103 final WebWindow w = getWindow();
104 if (w == null) {
105
106
107
108
109 return 0;
110 }
111 if (w.getEnclosedPage() != page) {
112
113
114
115
116 return 0;
117 }
118 final int id = NEXT_JOB_ID_.getAndIncrement();
119 job.setId(Integer.valueOf(id));
120
121 synchronized (this) {
122 scheduledJobsQ_.add(job);
123
124 if (LOG.isDebugEnabled()) {
125 LOG.debug("job added to queue");
126 LOG.debug(" window is: " + w);
127 LOG.debug(" added job: " + job);
128 LOG.debug("after adding job to the queue, the queue is: ");
129 printQueue();
130 }
131
132 notify();
133 }
134
135 return id;
136 }
137
138
139 @Override
140 public synchronized void removeJob(final int id) {
141 for (final JavaScriptJob job : scheduledJobsQ_) {
142 final int jobId = job.getId().intValue();
143 if (jobId == id) {
144 scheduledJobsQ_.remove(job);
145 break;
146 }
147 }
148 cancelledJobs_.add(Integer.valueOf(id));
149 notify();
150 }
151
152
153 @Override
154 public synchronized void stopJob(final int id) {
155 for (final JavaScriptJob job : scheduledJobsQ_) {
156 final int jobId = job.getId().intValue();
157 if (jobId == id) {
158 scheduledJobsQ_.remove(job);
159
160 break;
161 }
162 }
163 cancelledJobs_.add(Integer.valueOf(id));
164 notify();
165 }
166
167
168 @Override
169 public synchronized void removeAllJobs() {
170 if (currentlyRunningJob_ != null) {
171 cancelledJobs_.add(currentlyRunningJob_.getId());
172 }
173 for (final JavaScriptJob job : scheduledJobsQ_) {
174 cancelledJobs_.add(job.getId());
175 }
176 scheduledJobsQ_.clear();
177 notify();
178 }
179
180
181 @Override
182 @SuppressWarnings("PMD.GuardLogStatement")
183 public int waitForJobs(final long timeoutMillis) {
184 final boolean debug = LOG.isDebugEnabled();
185 if (debug) {
186 LOG.debug("Waiting for all jobs to finish (will wait max " + timeoutMillis + " millis).");
187 }
188 if (timeoutMillis > 0) {
189 long now = System.currentTimeMillis();
190 final long end = now + timeoutMillis;
191
192 synchronized (this) {
193 while (getJobCount() > 0 && now < end) {
194 try {
195 wait(end - now);
196 }
197 catch (final InterruptedException e) {
198 LOG.error("InterruptedException while in waitForJobs", e);
199
200
201 Thread.currentThread().interrupt();
202 }
203
204
205 now = System.currentTimeMillis();
206 }
207 }
208 }
209 final int jobs = getJobCount();
210 if (debug) {
211 LOG.debug("Finished waiting for all jobs to finish (final job count is " + jobs + ").");
212 }
213 return jobs;
214 }
215
216
217 @Override
218 public int waitForJobsStartingBefore(final long delayMillis) {
219 return waitForJobsStartingBefore(delayMillis, -1, null);
220 }
221
222
223 @Override
224 public int waitForJobsStartingBefore(final long delayMillis, final long timeoutMillis) {
225 return waitForJobsStartingBefore(delayMillis, timeoutMillis, null);
226 }
227
228
229 @Override
230 @SuppressWarnings("PMD.GuardLogStatement")
231 public int waitForJobsStartingBefore(final long delayMillis, final JavaScriptJobFilter filter) {
232 return waitForJobsStartingBefore(delayMillis, -1, filter);
233 }
234
235
236 @Override
237 @SuppressWarnings("PMD.GuardLogStatement")
238 public int waitForJobsStartingBefore(final long delayMillis, final long timeoutMillis,
239 final JavaScriptJobFilter filter) {
240 final boolean debug = LOG.isDebugEnabled();
241
242 long now = System.currentTimeMillis();
243 long end = now + timeoutMillis;
244 if (timeoutMillis < 0 || timeoutMillis < delayMillis) {
245 end = -1;
246 }
247
248 final long latestExecutionTime = System.currentTimeMillis() + delayMillis;
249 if (debug) {
250 LOG.debug("Waiting for all jobs that have execution time before "
251 + delayMillis + " (" + latestExecutionTime + ") to finish");
252 }
253
254 final long interval = Math.max(40, delayMillis);
255 synchronized (this) {
256 JavaScriptJob earliestJob = getEarliestJob(filter);
257 boolean pending = earliestJob != null && earliestJob.getTargetExecutionTime() < latestExecutionTime;
258 pending = pending
259 || (
260 currentlyRunningJob_ != null
261 && (filter == null || filter.passes(currentlyRunningJob_))
262 && currentlyRunningJob_.getTargetExecutionTime() < latestExecutionTime
263 );
264
265 while (pending && (end == -1 || now < end)) {
266 try {
267 wait(Math.max(40, Math.min(interval, end - now)));
268 }
269 catch (final InterruptedException e) {
270 LOG.error("InterruptedException while in waitForJobsStartingBefore", e);
271
272
273 Thread.currentThread().interrupt();
274 }
275
276 earliestJob = getEarliestJob(filter);
277 pending = earliestJob != null && earliestJob.getTargetExecutionTime() < latestExecutionTime;
278 pending = pending
279 || (
280 currentlyRunningJob_ != null
281 && (filter == null || filter.passes(currentlyRunningJob_))
282 && currentlyRunningJob_.getTargetExecutionTime() < latestExecutionTime
283 );
284 if (pending) {
285 now = System.currentTimeMillis();
286 }
287 }
288 }
289
290 final int jobs = getJobCount(filter);
291 if (debug) {
292 LOG.debug("Finished waiting for all jobs that have target execution time earlier than "
293 + latestExecutionTime + ", final job count is " + jobs);
294 }
295 return jobs;
296 }
297
298
299 @Override
300 public synchronized void shutdown() {
301 scheduledJobsQ_.clear();
302 notify();
303 }
304
305
306
307
308
309
310
311
312 private WebWindow getWindow() {
313 return window_.get();
314 }
315
316
317
318
319 private void printQueue() {
320 if (LOG.isDebugEnabled()) {
321 LOG.debug("------ printing JavaScript job queue -----");
322 LOG.debug(" number of jobs on the queue: " + scheduledJobsQ_.size());
323 int count = 1;
324 for (final JavaScriptJob job : scheduledJobsQ_) {
325 LOG.debug(" " + count + ") Job target execution time: " + job.getTargetExecutionTime());
326 LOG.debug(" job to string: " + job);
327 LOG.debug(" job id: " + job.getId());
328 if (job.isPeriodic()) {
329 LOG.debug(" period: " + job.getPeriod().intValue());
330 }
331 count++;
332 }
333 LOG.debug("------------------------------------------");
334 }
335 }
336
337
338
339
340 @Override
341 public synchronized String jobStatusDump(final JavaScriptJobFilter filter) {
342 final String lineSeparator = System.lineSeparator();
343
344 final StringBuilder status = new StringBuilder(110)
345 .append("------ JavaScript job status -----")
346 .append(lineSeparator);
347
348 if (null != currentlyRunningJob_ && (filter == null || filter.passes(currentlyRunningJob_))) {
349 status.append(" current running job: ").append(currentlyRunningJob_.toString())
350 .append(" job id: ").append(currentlyRunningJob_.getId())
351 .append(lineSeparator)
352 .append(lineSeparator)
353 .append(lineSeparator);
354 }
355 status.append(" number of jobs on the queue: ")
356 .append(scheduledJobsQ_.size())
357 .append(lineSeparator);
358
359 int count = 1;
360 for (final JavaScriptJob job : scheduledJobsQ_) {
361 if (filter == null || filter.passes(job)) {
362 final long now = System.currentTimeMillis();
363 final long execTime = job.getTargetExecutionTime();
364 status.append(" ").append(count).append(") Job target execution time: ")
365 .append(execTime).append(" (should start in ")
366 .append((execTime - now) / 1000d).append("s)")
367 .append(lineSeparator)
368 .append(" job to string: ").append(job)
369 .append(lineSeparator).append(" job id: ").append(job.getId())
370 .append(lineSeparator);
371 if (job.isPeriodic()) {
372 status.append(" period: ")
373 .append(job.getPeriod().toString())
374 .append(lineSeparator);
375 }
376 count++;
377 }
378 }
379 status.append("------------------------------------------")
380 .append(lineSeparator);
381
382 return status.toString();
383 }
384
385
386
387
388 @Override
389 public JavaScriptJob getEarliestJob() {
390 return scheduledJobsQ_.peek();
391 }
392
393
394
395
396 @Override
397 public synchronized JavaScriptJob getEarliestJob(final JavaScriptJobFilter filter) {
398 if (filter == null) {
399 return scheduledJobsQ_.peek();
400 }
401
402 for (final JavaScriptJob job : scheduledJobsQ_) {
403 if (filter.passes(job)) {
404 return job;
405 }
406 }
407 return null;
408 }
409
410
411
412
413 @Override
414 @SuppressWarnings("PMD.GuardLogStatement")
415 public boolean runSingleJob(final JavaScriptJob givenJob) {
416 assert givenJob != null;
417 final JavaScriptJob job = getEarliestJob();
418 if (job != givenJob) {
419 return false;
420 }
421
422 final long currentTime = System.currentTimeMillis();
423 if (job.getTargetExecutionTime() > currentTime) {
424 return false;
425 }
426 synchronized (this) {
427 if (scheduledJobsQ_.remove(job)) {
428 currentlyRunningJob_ = job;
429 }
430
431 }
432
433 final boolean debug = LOG.isDebugEnabled();
434 final boolean isPeriodicJob = job.isPeriodic();
435 if (isPeriodicJob) {
436 final long jobPeriod = job.getPeriod().longValue();
437
438
439 long timeDifference = currentTime - job.getTargetExecutionTime();
440 timeDifference = (timeDifference / jobPeriod) * jobPeriod + jobPeriod;
441 job.setTargetExecutionTime(job.getTargetExecutionTime() + timeDifference);
442
443
444 synchronized (this) {
445 if (!cancelledJobs_.contains(job.getId())) {
446 if (debug) {
447 LOG.debug("Reschedulling job " + job);
448 }
449 scheduledJobsQ_.add(job);
450 notify();
451 }
452 }
453 }
454 if (debug) {
455 final String periodicJob = isPeriodicJob ? "interval " : "";
456 LOG.debug("Starting " + periodicJob + "job " + job);
457 }
458 try {
459 job.run();
460 }
461 catch (final RuntimeException e) {
462 LOG.error("Job run failed with unexpected RuntimeException: " + e.getMessage(), e);
463 }
464 finally {
465 synchronized (this) {
466 if (job == currentlyRunningJob_) {
467 currentlyRunningJob_ = null;
468 }
469 notify();
470 }
471 }
472 if (debug) {
473 final String periodicJob = isPeriodicJob ? "interval " : "";
474 LOG.debug("Finished " + periodicJob + "job " + job);
475 }
476 return true;
477 }
478
479
480
481
482
483
484
485 private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
486 in.defaultReadObject();
487
488
489 scheduledJobsQ_ = new PriorityQueue<>();
490 cancelledJobs_ = new ArrayList<>();
491 currentlyRunningJob_ = null;
492 }
493 }