Coverage for gwcelery/views.py: 83%

257 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-23 18:01 +0000

1"""Flask web application views.""" 

2import datetime 

3import json 

4import os 

5import platform 

6import re 

7import socket 

8import sys 

9from importlib import metadata 

10 

11import lal 

12from astropy.time import Time 

13from celery import group 

14from flask import (flash, jsonify, make_response, redirect, render_template, 

15 request, url_for) 

16from requests.exceptions import HTTPError 

17 

18from . import app as celery_app 

19from ._version import get_versions 

20from .flask import app, cache 

21from .tasks import (bayestar, circulars, core, external_skymaps, first2years, 

22 first2years_external, gracedb, orchestrator, skymaps, 

23 superevents) 

24from .util import PromiseProxy 

25 

26# Change the application root url 

27PREFIX = os.getenv('FLASK_APP_PREFIX', '') 

28 

29distributions = PromiseProxy(lambda: tuple(metadata.distributions())) 

30 

31 

32@app.route(PREFIX + '/') 

33def index(): 

34 """Render main page.""" 

35 return render_template( 

36 'index.jinja2', 

37 conf=celery_app.conf, 

38 hostname=socket.getfqdn(), 

39 detectors=sorted(lal.cached_detector_by_prefix.keys()), 

40 distributions=distributions, 

41 platform=platform.platform(), 

42 versions=get_versions(), 

43 python_version=sys.version, 

44 joint_mdc_freq=celery_app.conf['joint_mdc_freq']) 

45 

46 

47def take_n(n, iterable): 

48 """Take the first `n` items of a collection.""" 

49 for i, item in enumerate(iterable): 

50 if i >= n: 

51 break 

52 yield item 

53 

54 

55# Regular expression for parsing query strings 

56# that look like GraceDB superevent names. 

57_typeahead_superevent_id_regex = re.compile( 

58 r'(?P<prefix>[MT]?)S?(?P<date>\d{0,6})(?P<suffix>[a-z]*)', 

59 re.IGNORECASE) 

60 

61 

62@app.route(PREFIX + '/typeahead_superevent_id') 

63@cache.cached(query_string=True) 

64def typeahead_superevent_id(): 

65 """Search GraceDB for superevents by ID. 

66 

67 This involves some date parsing because GraceDB does not support directly 

68 searching for superevents by ID substring. 

69 """ 

70 max_results = 8 # maximum number of results to return 

71 batch_results = 32 # batch size for results from server 

72 

73 term = request.args.get('superevent_id') 

74 match = _typeahead_superevent_id_regex.fullmatch(term) if term else None 

75 

76 if match: 

77 # Determine GraceDB event category from regular expression. 

78 prefix = match['prefix'].upper() + 'S' 

79 category = {'T': 'test', 'M': 'MDC'}.get( 

80 match['prefix'].upper(), 'production') 

81 

82 # Determine start date from regular expression by padding out 

83 # the partial date with missing digits defaulting to 000101. 

84 date_partial = match['date'] 

85 date_partial_length = len(date_partial) 

86 try: 

87 date_start = datetime.datetime.strptime( 

88 date_partial + '000101'[date_partial_length:], '%y%m%d') 

89 except ValueError: # invalid date 

90 return jsonify([]) 

91 

92 # Determine end date from regular expression by adding a very 

93 # loose upper bound on the number of days until the next 

94 # digit in the date rolls over. No need to be exact here. 

95 date_end = date_start + datetime.timedelta( 

96 days=[36600, 3660, 366, 320, 32, 11, 1.1][date_partial_length]) 

97 

98 # Determine GraceDB event suffix from regular expression. 

99 suffix = match['suffix'].lower() 

100 else: 

101 prefix = 'S' 

102 category = 'production' 

103 date_end = datetime.datetime.utcnow() 

104 date_start = date_end - datetime.timedelta(days=7) 

105 date_partial = '' 

106 date_partial_length = 0 

107 suffix = '' 

108 

109 # Query GraceDB. 

110 query = 'category: {} t_0: {} .. {}'.format( 

111 category, Time(date_start).gps, Time(date_end).gps) 

112 response = gracedb.client.superevents.search( 

113 query=query, sort='superevent_id', count=batch_results) 

114 

115 # Filter superevent IDs that match the search term. 

116 regex = re.compile(r'{}{}\d{{{}}}{}[a-z]*'.format( 

117 prefix, date_partial, 6 - date_partial_length, suffix)) 

118 superevent_ids = ( 

119 superevent['superevent_id'] for superevent 

120 in response if regex.fullmatch(superevent['superevent_id'])) 

121 

122 # Return only the first few matches. 

123 return jsonify(list(take_n(max_results, superevent_ids))) 

124 

125 

126@app.route(PREFIX + '/typeahead_event_id') 

127@cache.cached(query_string=True) 

128def typeahead_event_id(): 

129 """Search GraceDB for events by ID.""" 

130 superevent_id = request.args.get('superevent_id').strip() 

131 query_terms = [f'superevent: {superevent_id}'] 

132 if superevent_id.startswith('T'): 

133 query_terms.append('Test') 

134 elif superevent_id.startswith('M'): 

135 query_terms.append('MDC') 

136 query = ' '.join(query_terms) 

137 try: 

138 results = gracedb.get_events(query) 

139 except HTTPError: 

140 results = [] 

141 results = [dict(r, snr=superevents.get_snr(r)) for r in results 

142 if superevents.is_complete(r)] 

143 return jsonify(list(reversed(sorted(results, key=superevents.keyfunc)))) 

144 

145 

146def _search_by_tag_and_filename(superevent_id, filename, extension, tag): 

147 try: 

148 records = gracedb.get_log(superevent_id) 

149 return [ 

150 '{},{}'.format(record['filename'], record['file_version']) 

151 for record in records if tag in record['tag_names'] 

152 and record['filename'].startswith(filename) 

153 and record['filename'].endswith(extension)] 

154 except HTTPError as e: 

155 # Ignore 404 errors from server 

156 if e.response.status_code == 404: 

157 return [] 

158 else: 

159 raise 

160 

161 

162@app.route(PREFIX + '/typeahead_skymap_filename') 

163@cache.cached(query_string=True) 

164def typeahead_skymap_filename(): 

165 """Search for sky maps by filename.""" 

166 return jsonify(_search_by_tag_and_filename( 

167 request.args.get('superevent_id') or '', 

168 request.args.get('filename') or '', 

169 '.multiorder.fits', 'sky_loc' 

170 )) 

171 

172 

173@app.route(PREFIX + '/typeahead_em_bright_filename') 

174@cache.cached(query_string=True) 

175def typeahead_em_bright_filename(): 

176 """Search em_bright files by filename.""" 

177 return jsonify(_search_by_tag_and_filename( 

178 request.args.get('superevent_id') or '', 

179 request.args.get('filename') or '', 

180 '.json', 'em_bright' 

181 )) 

182 

183 

184@app.route(PREFIX + '/typeahead_p_astro_filename') 

185@cache.cached(query_string=True) 

186def typeahead_p_astro_filename(): 

187 """Search p_astro files by filename.""" 

188 return jsonify(_search_by_tag_and_filename( 

189 request.args.get('superevent_id') or '', 

190 request.args.get('filename') or '', 

191 '.json', 'p_astro' 

192 )) 

193 

194 

195@celery_app.task(shared=False, ignore_result=True) 

196def _construct_igwn_alert_and_send_prelim_alert(superevent_event_list, 

197 superevent_id, 

198 initiate_voevent=True): 

199 superevent, event = superevent_event_list 

200 alert = { 

201 'uid': superevent_id, 

202 'object': superevent 

203 } 

204 

205 orchestrator.earlywarning_preliminary_alert( 

206 event, 

207 alert, 

208 alert_type='preliminary', 

209 initiate_voevent=initiate_voevent 

210 ) 

211 

212 

213@app.route(PREFIX + '/send_preliminary_gcn', methods=['POST']) 

214def send_preliminary_gcn(): 

215 """Handle submission of preliminary alert form.""" 

216 keys = ('superevent_id', 'event_id') 

217 superevent_id, event_id, *_ = tuple(request.form.get(key) for key in keys) 

218 if superevent_id and event_id: 

219 try: 

220 event = gracedb.get_event(event_id) 

221 except HTTPError as e: 

222 flash(f'No action performed. GraceDB query for {event_id} ' 

223 f'returned error code {e.response.status_code}.', 'danger') 

224 return redirect(url_for('index')) 

225 

226 ( 

227 gracedb.upload.s( 

228 None, None, superevent_id, 

229 'User {} queued a Preliminary alert through the dashboard.' 

230 .format(request.remote_user or '(unknown)'), 

231 tags=['em_follow']) 

232 | 

233 gracedb.update_superevent.si( 

234 superevent_id, preferred_event=event_id, t_0=event['gpstime']) 

235 | 

236 group( 

237 gracedb.get_superevent.si(superevent_id), 

238 

239 gracedb.get_event.si(event_id) 

240 ) 

241 | 

242 _construct_igwn_alert_and_send_prelim_alert.s(superevent_id) 

243 ).delay() 

244 flash('Queued preliminary alert for {}.'.format(superevent_id), 

245 'success') 

246 else: 

247 flash('No alert sent. Please fill in all fields.', 'danger') 

248 return redirect(url_for('index')) 

249 

250 

251@app.route(PREFIX + '/change_preferred_event', methods=['POST']) 

252def change_preferred_event(): 

253 """Handle submission of preliminary alert form.""" 

254 keys = ('superevent_id', 'event_id') 

255 superevent_id, event_id, *_ = tuple(request.form.get(key) for key in keys) 

256 if superevent_id and event_id: 

257 try: 

258 event = gracedb.get_event(event_id) 

259 except HTTPError as e: 

260 flash(f'No change performed. GraceDB query for {event_id} ' 

261 f'returned error code {e.response.status_code}.', 'danger') 

262 return redirect(url_for('index')) 

263 

264 try: 

265 superevent = gracedb.get_superevent(superevent_id) 

266 except HTTPError as e: 

267 flash(f'No change performed. GraceDB query for {superevent_id} ' 

268 f'returned error code {e.response.status_code}.', 'danger') 

269 return redirect(url_for('index')) 

270 ( 

271 gracedb.upload.s( 

272 None, None, superevent_id, 

273 celery_app.conf['views_manual_preferred_event_log_message'] 

274 .format(request.remote_user or '(unknown)', event_id), 

275 tags=['em_follow']) 

276 | 

277 gracedb.update_superevent.si( 

278 superevent_id, preferred_event=event_id, 

279 t_0=event['gpstime']) 

280 | 

281 _construct_igwn_alert_and_send_prelim_alert.si( 

282 [superevent, event], 

283 superevent_id, 

284 initiate_voevent=False 

285 ) 

286 ).delay() 

287 

288 # Update pipeline-preferred event if the new preferred event is not 

289 # already the pipeline-preferred event for the pipeline that uploaded 

290 # it. 

291 pipeline_pref_event = \ 

292 superevent['pipeline_preferred_events'].get(event['pipeline'], {}) 

293 if pipeline_pref_event.get('graceid', '') != event_id: 

294 ( 

295 gracedb.upload.s( 

296 None, None, superevent_id, 

297 'Manual update of preferred event triggered update of ' 

298 f'{event["pipeline"]}-preferred event to {event_id}', 

299 tags=['em_follow']) 

300 | 

301 gracedb.add_pipeline_preferred_event.si( 

302 superevent_id, event_id) 

303 ).delay() 

304 

305 flash('Changed preferred event for {}.'.format(superevent_id), 

306 'success') 

307 else: 

308 flash('No change performed. Please fill in all fields.', 'danger') 

309 return redirect(url_for('index')) 

310 

311 

312@app.route(PREFIX + '/change_pipeline_preferred_event', methods=['POST']) 

313def change_pipeline_preferred_event(): 

314 """Handle submission of preliminary alert form.""" 

315 keys = ('superevent_id', 'pipeline', 'event_id') 

316 superevent_id, pipeline, event_id, *_ = tuple(request.form.get(key) for 

317 key in keys) 

318 if superevent_id and pipeline and event_id: 

319 try: 

320 event = gracedb.get_event(event_id) 

321 except HTTPError as e: 

322 flash(f'No change performed. GraceDB query for {event_id} ' 

323 f'returned error code {e.response.status_code}.', 'danger') 

324 return redirect(url_for('index')) 

325 

326 # Check that specified event is from specified pipeline 

327 if event['pipeline'].lower() != pipeline.lower(): 

328 flash(f'No change performed. {event_id} was uploaded by ' 

329 f'{event["pipeline"].lower()} and cannot be the ' 

330 f'{pipeline.lower()}-preferred event.', 'danger') 

331 return redirect(url_for('index')) 

332 

333 try: 

334 superevent = gracedb.get_superevent(superevent_id) 

335 except HTTPError as e: 

336 flash(f'No change performed. GraceDB query for {superevent_id} ' 

337 f'returned error code {e.response.status_code}.', 'danger') 

338 return redirect(url_for('index')) 

339 

340 # Check that this pipeline's preferred event is not the 

341 # superevent's preferred event 

342 if superevent['preferred_event_data']['pipeline'].lower() == \ 

343 pipeline.lower(): 

344 flash(f'No change performed. User specified pipeline, ' 

345 f'{pipeline.lower()}, is the same pipeline that ' 

346 f'produced {superevent_id}\'s preferred event.', 'danger') 

347 return redirect(url_for('index')) 

348 

349 ( 

350 gracedb.upload.s( 

351 None, None, superevent_id, 

352 'User {} queued a {} preferred event change to {}.' 

353 .format(request.remote_user or '(unknown)', pipeline, 

354 event_id), 

355 tags=['em_follow']) 

356 | 

357 gracedb.add_pipeline_preferred_event.si( 

358 superevent_id, event_id) 

359 | 

360 _construct_igwn_alert_and_send_prelim_alert.si( 

361 [superevent, event], 

362 superevent_id, 

363 initiate_voevent=False 

364 ) 

365 ).delay() 

366 flash(f'Changed {pipeline.lower()} preferred event for ' 

367 f'{superevent_id}.', 'success') 

368 else: 

369 flash('No change performed. Please fill in all fields.', 'danger') 

370 return redirect(url_for('index')) 

371 

372 

373@app.route(PREFIX + '/send_update_gcn', methods=['POST']) 

374def send_update_gcn(): 

375 """Handle submission of update alert form.""" 

376 keys = ('superevent_id', 'skymap_filename', 

377 'em_bright_filename', 'p_astro_filename') 

378 superevent_id, *filenames = args = tuple( 

379 request.form.get(key) for key in keys) 

380 if all(args): 

381 ( 

382 gracedb.upload.s( 

383 None, None, superevent_id, 

384 'User {} queued an Update alert through the dashboard.' 

385 .format(request.remote_user or '(unknown)'), 

386 tags=['em_follow']) 

387 | 

388 orchestrator.update_alert.si(filenames, superevent_id) 

389 ).delay() 

390 flash('Queued update alert for {}.'.format(superevent_id), 'success') 

391 else: 

392 flash('No alert sent. Please fill in all fields.', 'danger') 

393 return redirect(url_for('index')) 

394 

395 

396@app.route(PREFIX + '/create_medium_latency_gcn_circular', methods=['POST']) 

397def create_medium_latency_gcn_circular(): 

398 """Handle submission of medium_latency GCN Circular form.""" 

399 ext_event_id = request.form.get('ext_event_id') 

400 if ext_event_id: 

401 response = make_response(circulars.create_medium_latency_circular( 

402 ext_event_id)) 

403 response.headers["content-type"] = "text/plain" 

404 return response 

405 else: 

406 flash('No circular created. Please fill in external event ID', 

407 'danger') 

408 return redirect(url_for('index')) 

409 

410 

411@app.route(PREFIX + '/create_update_gcn_circular', methods=['POST']) 

412def create_update_gcn_circular(): 

413 """Handle submission of GCN Circular form.""" 

414 keys = ['sky_localization', 'em_bright', 'p_astro', 'raven'] 

415 superevent_id = request.form.get('superevent_id') 

416 updates = [key for key in keys if request.form.get(key)] 

417 if superevent_id and updates: 

418 response = make_response(circulars.create_update_circular( 

419 superevent_id, 

420 update_types=updates)) 

421 response.headers["content-type"] = "text/plain" 

422 return response 

423 else: 

424 flash('No circular created. Please fill in superevent ID and at ' + 

425 'least one update type.', 'danger') 

426 return redirect(url_for('index')) 

427 

428 

429@app.route(PREFIX + '/download_upload_external_skymap', methods=['POST']) 

430def download_upload_external_skymap(): 

431 """Download sky map from URL to be uploaded to external event. Passes 

432 a search field 'FromURL' which indicates to get_upload_external_skymap 

433 to use the provided URL to download the sky map. 

434 """ 

435 keys = ('ext_id', 'skymap_url') 

436 ext_id, skymap_url, *_ = tuple(request.form.get(key) for key in keys) 

437 if ext_id and skymap_url: 

438 ext_event = {'graceid': ext_id, 'search': 'FromURL'} 

439 external_skymaps.get_upload_external_skymap( 

440 ext_event, skymap_link=skymap_url) 

441 flash('Downloaded sky map for {}.'.format(ext_id), 

442 'success') 

443 else: 

444 flash('No skymap uploaded. Please fill in all fields.', 'danger') 

445 return redirect(url_for('index')) 

446 

447 

448@celery_app.task(queue='exttrig', 

449 shared=False) 

450def _update_preferred_external_event(ext_event, superevent_id): 

451 """Update preferred external event to given external event.""" 

452 # FIXME: Consider consolidating with raven.update_coinc_far by using 

453 # a single function in superevents.py 

454 if ext_event['search'] in {'GRB', 'SubGRB', 'SubGRBTargeted'}: 

455 coinc_far_dict = gracedb.download( 

456 'coincidence_far.json', ext_event['graceid']) 

457 if not isinstance(coinc_far_dict, dict): 

458 coinc_far_dict = json.loads(coinc_far_dict) 

459 time_coinc_far = coinc_far_dict['temporal_coinc_far'] 

460 space_coinc_far = coinc_far_dict['spatiotemporal_coinc_far'] 

461 else: 

462 time_coinc_far = None 

463 space_coinc_far = None 

464 gracedb.update_superevent(superevent_id, em_type=ext_event['graceid'], 

465 time_coinc_far=time_coinc_far, 

466 space_coinc_far=space_coinc_far) 

467 

468 

469@app.route(PREFIX + '/apply_raven_labels', methods=['POST']) 

470def apply_raven_labels(): 

471 """Applying RAVEN alert label and update the preferred external event 

472 to the given coincidence.""" 

473 keys = ('superevent_id', 'ext_id', 'event_id') 

474 superevent_id, ext_id, event_id, *_ = tuple(request.form.get(key) 

475 for key in keys) 

476 if superevent_id and ext_id and event_id: 

477 ( 

478 gracedb.get_event.si(ext_id) 

479 | 

480 _update_preferred_external_event.s(superevent_id) 

481 | 

482 gracedb.create_label.si('RAVEN_ALERT', superevent_id) 

483 | 

484 gracedb.create_label.si('RAVEN_ALERT', ext_id) 

485 | 

486 gracedb.create_label.si('RAVEN_ALERT', event_id) 

487 ).delay() 

488 flash('Applied RAVEN alert label for {}.'.format(superevent_id), 

489 'success') 

490 else: 

491 flash('No alert sent. Please fill in all fields.', 'danger') 

492 return redirect(url_for('index')) 

493 

494 

495@app.route(PREFIX + '/send_mock_event', methods=['POST']) 

496def send_mock_event(): 

497 """Handle submission of mock alert form.""" 

498 first2years.upload_event.delay() 

499 flash('Queued a mock event.', 'success') 

500 return redirect(url_for('index')) 

501 

502 

503@gracedb.task(shared=False) 

504def _create_upload_external_event(gpstime): 

505 new_time = first2years_external._offset_time( 

506 gpstime, 'CBC', 'Fermi', 'GRB') 

507 

508 ext_event = first2years_external.create_upload_external_event( 

509 new_time, 'Fermi', 'MDC') 

510 

511 return ext_event 

512 

513 

514@app.route(PREFIX + '/send_mock_joint_event', methods=['POST']) 

515def send_mock_joint_event(): 

516 """Handle submission of mock alert form.""" 

517 ( 

518 first2years.upload_event.si() 

519 | 

520 _create_upload_external_event.s().set(countdown=5) 

521 ).delay() 

522 flash('Queued a mock joint event.', 'success') 

523 return redirect(url_for('index')) 

524 

525 

526@app.route(PREFIX + '/create_skymap_with_disabled_detectors', methods=['POST']) 

527def create_skymap_with_disabled_detectors(): 

528 """Create a BAYESTAR sky map with one or more disabled detectors.""" 

529 form = request.form.to_dict() 

530 graceid = form.pop('event_id') 

531 disabled_detectors = sorted(form.keys()) 

532 tags = ['sky_loc', 'public'] 

533 if graceid and disabled_detectors: 

534 filename = f'bayestar.no-{"".join(disabled_detectors)}.multiorder.fits' 

535 ( 

536 gracedb.download.s('coinc.xml', graceid) 

537 | 

538 bayestar.localize.s(graceid, disabled_detectors=disabled_detectors) 

539 | 

540 group( 

541 core.identity.s(), 

542 gracedb.upload.s( 

543 filename, graceid, 

544 'sky localization complete', tags 

545 ) 

546 ) 

547 | 

548 skymaps.annotate_fits_tuple.s(graceid, tags) 

549 ).delay() 

550 flash('Creating sky map for event ID ' + graceid + 

551 ' with these disabled detectors: ' + 

552 ' '.join(disabled_detectors), 'success') 

553 else: 

554 flash('No sky map created. Please fill in all fields.', 'danger') 

555 return redirect(url_for('index')) 

556 

557 

558@app.route(PREFIX + '/copy_sky_map_between_events', methods=['POST']) 

559def copy_sky_map_between_events(): 

560 superevent_id = request.form['superevent_id'] 

561 graceid = request.form['event_id'] 

562 skymap_filename = request.form['skymap_filename'] 

563 skymap_filename_no_version, _, _ = skymap_filename.rpartition(',') 

564 tags = ['sky_loc', 'public'] 

565 ( 

566 gracedb.download.s(skymap_filename, graceid) 

567 | 

568 group( 

569 core.identity.s(), 

570 gracedb.upload.s( 

571 skymap_filename_no_version, superevent_id, 

572 f'sky map copied from {graceid}', tags 

573 ) 

574 ) 

575 | 

576 skymaps.annotate_fits_tuple.s(superevent_id, tags) 

577 ).delay() 

578 flash(f'Copying file {skymap_filename} from {graceid} to {superevent_id}', 

579 'success') 

580 return redirect(url_for('index'))