Coverage for gwcelery/tasks/first2years_external.py: 100%

103 statements  

« prev     ^ index     » next       coverage.py v7.4.2, created at 2024-03-28 17:21 +0000

1"""Create mock external events to be in coincidence 

2 with MDC superevents.""" 

3 

4import random 

5import re 

6from pathlib import Path 

7 

8import numpy as np 

9from astropy.time import Time 

10from lxml import etree 

11 

12from .. import app 

13from ..tests import data 

14from ..util import read_json 

15from . import external_triggers, igwn_alert, raven 

16 

17 

18def create_upload_external_event(gpstime, pipeline, ext_search): 

19 """Create a random external event VOEvent or Kafka alert packet. 

20 

21 Parameters 

22 ---------- 

23 gpstime : float 

24 Event's GPS time 

25 pipeline : str 

26 External trigger pipeline name 

27 ext_search : str 

28 Search field for external event 

29 

30 Returns 

31 ------- 

32 event : str or dict 

33 Alert packet in format as if it was sent from GCN or Kafka, in a 

34 XML GCN notice alert packet in string format for the GRB or SubGRB 

35 search or a dictionary Kafka packet if for SubGRBTargeted search. 

36 """ 

37 new_date = str(Time(gpstime, format='gps', scale='utc').isot) + 'Z' 

38 new_TrigID = str(int(gpstime)) 

39 ra = random.uniform(0, 360) 

40 thetas = np.arange(-np.pi / 2, np.pi / 2, .01) 

41 dec = random.choices(np.rad2deg(thetas), 

42 weights=np.cos(thetas) / sum(np.cos(thetas)))[0] 

43 error = .05 if pipeline == 'Swift' else random.uniform(1, 30) 

44 

45 if pipeline in {'Fermi', 'Swift', 'INTEGRAL'}: 

46 is_grb = True 

47 else: 

48 is_grb = False 

49 

50 # If SubGRBTargeted modify alert packet from Kafka 

51 if ext_search == 'SubGRBTargeted': 

52 if pipeline == 'Fermi': 

53 # Template based on: 

54 # https://github.com/joshuarwood/gcn-schema/tree/main/gcn/notices/ 

55 # fermi/gbm 

56 alert = read_json(data, 'kafka_alert_fermi.json') 

57 # Remove sky map file to create our own sky map 

58 alert.pop('healpix_file') 

59 alert['dec_uncertainty'] = [error] 

60 elif pipeline == 'Swift': 

61 # Template based on: 

62 # https://github.com/nasa-gcn/gcn-schema/tree/main/gcn/notices/ 

63 # swift/bat/guano 

64 alert = read_json(data, 'kafka_alert_swift.json') 

65 alert['ra_uncertainty'] = [error] 

66 else: 

67 raise ValueError( 

68 'Only use Fermi or Swift for SubGRBTargeted search') 

69 

70 alert['trigger_time'] = new_date 

71 alert['alert_datetime'] = new_date 

72 alert['id'] = [new_TrigID] 

73 alert['ra'] = ra 

74 alert['dec'] = dec 

75 # Generate FAR from max threshold and trials factors 

76 alert['far'] = \ 

77 (app.conf['raven_targeted_far_thresholds']['GRB'][pipeline] * 

78 random.uniform(0, 1)) 

79 

80 external_triggers.handle_targeted_kafka_alert(alert) 

81 

82 # Return VOEvent for testing until GraceDB natively ingests kafka 

83 # alerts 

84 return external_triggers._kafka_to_voevent(alert)[0] 

85 

86 # Otherwise modify respective VOEvent template 

87 else: 

88 fname = str(Path(__file__).parent / 

89 '../tests/data/{0}{1}_{2}gcn.xml'.format( 

90 pipeline.lower(), 

91 '_subthresh' if ext_search == 'SubGRB' else '', 

92 'grb_' if is_grb else '')) 

93 

94 root = etree.parse(fname) 

95 # Change ivorn to indicate if this is an MDC event or O3 replay event 

96 root.xpath('.')[0].attrib['ivorn'] = \ 

97 'ivo://lvk.internal/{0}#{1}{2}_event_{3}'.format( 

98 pipeline if pipeline != 'Swift' else 'SWIFT', 

99 '_subthresh' if ext_search == 'SubGRB' else '', 

100 'MDC-test' if ext_search == 'MDC' else 'O3-replay', 

101 new_date).encode() 

102 

103 # Change times to chosen time 

104 root.find("./Who/Date").text = str(new_date).encode() 

105 root.find(("./WhereWhen/ObsDataLocation/" 

106 "ObservationLocation/AstroCoords/Time/TimeInstant/" 

107 "ISOTime")).text = str(new_date).encode() 

108 if ext_search == 'SubGRB': 

109 root.find("./What/Param[@name='Trans_Num']").attrib['value'] = \ 

110 str(new_TrigID).encode() 

111 else: 

112 root.find("./What/Param[@name='TrigID']").attrib['value'] = \ 

113 str(new_TrigID).encode() 

114 

115 if is_grb: 

116 # Give random sky position 

117 root.find(("./WhereWhen/ObsDataLocation/" 

118 "ObservationLocation/AstroCoords/Position2D/Value2/" 

119 "C1")).text = str(ra).encode() 

120 root.find(("./WhereWhen/ObsDataLocation/" 

121 "ObservationLocation/AstroCoords/Position2D/Value2/" 

122 "C2")).text = str(dec).encode() 

123 if pipeline != 'Swift': 

124 root.find( 

125 ("./WhereWhen/ObsDataLocation/" 

126 "ObservationLocation/AstroCoords/Position2D/" 

127 "Error2Radius")).text = str(error).encode() 

128 

129 event = etree.tostring(root, xml_declaration=True, encoding="UTF-8", 

130 pretty_print=True) 

131 

132 # Upload as from GCN 

133 if is_grb: 

134 external_triggers.handle_grb_gcn(event) 

135 else: 

136 external_triggers.handle_snews_gcn(event) 

137 

138 return event 

139 

140 

141def _offset_time(gpstime, group, pipeline, ext_search): 

142 """Offsets the given GPS time by applying a random number within a time 

143 window, determine by the search being done. 

144 

145 Parameters 

146 ---------- 

147 gpstime : float 

148 Event's GPS time 

149 group : str 

150 Burst or CBC 

151 pipeline : str 

152 Pipeline field for external event 

153 ext_search : str 

154 Search field for external event 

155 

156 Returns 

157 ------- 

158 gsptime_adjusted : float 

159 Event's original gps time plus a random number within the determined 

160 search window 

161 

162 """ 

163 tl, th = raven._time_window('S1', group, [pipeline], [ext_search]) 

164 return gpstime + random.uniform(tl, th) 

165 

166 

167def _is_joint_mdc(graceid, se_search): 

168 """Determine whether to upload an external events using user-defined 

169 frequency of MDC or AllSky superevents. 

170 

171 Looks at the ending letters of a superevent (e.g. 'ac' from 'MS190124ac'), 

172 converts to a number, and checks if divisible by a number given in the 

173 configuration file. 

174 

175 For example, if the configuration number 

176 :obj:`~gwcelery.conf.joint_mdc_freq` is 10, 

177 this means joint events with superevents ending with 'j', 't', 'ad', etc. 

178 

179 Parameters 

180 ---------- 

181 graceid : str 

182 GraceDB ID of superevent 

183 se_search : str 

184 Search field for preferred event in superevent 

185 

186 Returns 

187 ------- 

188 is_joint_mdc : bool 

189 Returns True if the GraceDB ID matches pre-determined frequency rates 

190 

191 """ 

192 end_string = re.split(r'\d+', graceid)[-1].lower() 

193 val = 0 

194 for i in range(len(end_string)): 

195 val += (ord(end_string[i]) - 96) * 26 ** (len(end_string) - i - 1) 

196 return val % int(app.conf['joint_mdc_freq']) == 0 if se_search == 'MDC' \ 

197 else val % int(app.conf['joint_O3_replay_freq']) == 0 

198 

199 

200@igwn_alert.handler('mdc_superevent', 

201 'superevent', 

202 shared=False) 

203def upload_external_event(alert, ext_search=None): 

204 """Upload a random GRB event(s) for a certain fraction of MDC 

205 or O3-replay superevents. 

206 

207 Notes 

208 ----- 

209 Every n superevents, upload a GRB candidate within the 

210 standard CBC-GRB or Burst-GRB search window, where the frequency n is 

211 determined by the configuration variable 

212 :obj:`~gwcelery.conf.joint_mdc_freq` or 

213 :obj:`~gwcelery.conf.joint_O3_replay_freq`. 

214 

215 For O3 replay testing with RAVEN pipeline, only runs on gracedb-playground. 

216 

217 Parameters 

218 ---------- 

219 alert : dict 

220 IGWN alert packet 

221 ext_search : str 

222 Search field for external event 

223 

224 Returns 

225 ------- 

226 events, pipelines : tuple 

227 Returns tuple of the list of external events created and the list of 

228 pipelines chosen for each event 

229 

230 """ 

231 if alert['alert_type'] != 'new': 

232 return 

233 se_search = alert['object']['preferred_event_data']['search'] 

234 group = alert['object']['preferred_event_data']['group'] 

235 is_gracedb_playground = app.conf['gracedb_host'] \ 

236 == 'gracedb-playground.ligo.org' 

237 joint_mdc_alert = se_search == 'MDC' and _is_joint_mdc(alert['uid'], 'MDC') 

238 joint_allsky_alert = se_search == 'AllSky' and \ 

239 _is_joint_mdc(alert['uid'], 'AllSky') and is_gracedb_playground and \ 

240 group in {'CBC', 'Burst'} 

241 if not (joint_mdc_alert or joint_allsky_alert): 

242 return 

243 

244 # Potentially upload 1, 2, or 3 GRB events 

245 num = 1 + np.random.choice(np.arange(3), p=[.6, .3, .1]) 

246 

247 if joint_mdc_alert: 

248 # If joint MDC alert, make external event MDC 

249 if ext_search is None: 

250 ext_search = 'MDC' 

251 elif ext_search == 'MDC': 

252 pass 

253 else: 

254 raise ValueError('External search must be "MDC" if MDC superevent') 

255 # If O3 replay, choose search from acceptable list 

256 elif ext_search is None: 

257 # Determine search for external event and then pipelines 

258 ext_search = \ 

259 (np.random.choice(['GRB', 'SubGRB', 'SubGRBTargeted'], 

260 p=[.6, .1, .3]) 

261 if joint_allsky_alert else 'GRB') 

262 # Choose pipeline(s) based on search 

263 if ext_search in {'GRB', 'MDC'}: 

264 pipelines = np.random.choice(['Fermi', 'Swift', 'INTEGRAL'], 

265 p=[.6, .3, .1,], 

266 size=num, replace=False) 

267 elif ext_search == 'SubGRB': 

268 pipelines = np.full(num, 'Fermi') 

269 elif ext_search == 'SubGRBTargeted': 

270 # Only two current pipelines in targeted search so map 3 => 2 

271 num = min(num, 2) 

272 pipelines = np.random.choice(['Fermi', 'Swift'], 

273 p=[.5, .5], size=num, replace=False) 

274 events = [] 

275 for pipeline in pipelines: 

276 gpstime = float(alert['object']['t_0']) 

277 new_time = _offset_time(gpstime, group, pipeline, ext_search) 

278 

279 # Choose external grb pipeline to simulate 

280 ext_event = create_upload_external_event( 

281 new_time, pipeline, ext_search) 

282 

283 events.append(ext_event) 

284 

285 return events, pipelines 

286 

287 

288@app.task(ignore_result=True, 

289 shared=False) 

290def upload_snews_event(): 

291 """Create and upload a SNEWS-like MDC external event. 

292 

293 Returns 

294 ------- 

295 ext_event : str 

296 XML GCN notice alert packet in string format 

297 """ 

298 current_time = Time(Time.now(), format='gps').value 

299 ext_event = create_upload_external_event(current_time, 'SNEWS', 'MDC') 

300 return ext_event