Coverage for gwcelery/tasks/notice_text.py: 15%

107 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-25 18:01 +0000

1"""Tasks to validate the GCN Notice types of the e-mail formats [GCN e-mail]_. 

2 

3References 

4---------- 

5.. [GCN e-mail] https://gcn.gsfc.nasa.gov/lvc.html#tc13 

6 

7""" 

8import email 

9import email.policy 

10from math import isclose 

11 

12import lxml.etree 

13from celery.utils.log import get_task_logger 

14 

15from .. import app 

16from ..email.signals import email_received 

17from . import gracedb 

18 

19log = get_task_logger(__name__) 

20 

21 

22def _trigger_datetime(gcn_notice_mail): 

23 """Get trigger data and time from a GCN email notice.""" 

24 

25 # We now add Z to ISOtime 

26 # TRIGGER_DATE: 20123 TJD; 179 DOY; 2023/06/28 (yyyy/mm/dd) 

27 # TRIGGER_TIME: 83520.000000 SOD {23:12:00.000000} UT 

28 # <ISOTime>2023-06-28T23:12:00Z</ISOTime> 

29 trigger_date = gcn_notice_mail[ 

30 "TRIGGER_DATE"].split()[4].replace("/", "-") 

31 

32 # FIXME: replace with a regular expression. 

33 trigger_time = gcn_notice_mail["TRIGGER_TIME"].split()[2] 

34 trigger_time = trigger_time.replace("{", "").replace("}", "") 

35 trigger_time = trigger_time.split('.')[0] 

36 

37 trigger_datetime = (f'{trigger_date}T{trigger_time}Z') 

38 

39 return trigger_datetime 

40 

41 

42def _vo_match_notice(gcn_notice_mail, params_vo, trigger_time_vo): 

43 """Match the notice-email and the VOtable keywords.""" 

44 dict_checks = {} 

45 

46 # TRIGGER_DATE+TRIGGER_TIME 

47 trigger_datetime_notice_mail = _trigger_datetime(gcn_notice_mail) 

48 

49 match_trigger_datetime = ( 

50 trigger_datetime_notice_mail == trigger_time_vo) 

51 dict_checks['TRIGGER_DATETIME'] = match_trigger_datetime 

52 

53 # SEQUENCE_NUM 

54 match_sequence_num = ( 

55 gcn_notice_mail["SEQUENCE_NUM"].split()[0] == params_vo["Pkt_Ser_Num"]) 

56 dict_checks['SEQUENCE_NUM'] = match_sequence_num 

57 

58 if params_vo['AlertType'] == 'Retraction': 

59 return dict_checks 

60 

61 # Notice keywords 

62 notice_keys = ({"types": ["GROUP_TYPE", "PIPELINE_TYPE", "SEARCH_TYPE"], 

63 "classif_props_cbc": ["PROB_NS", "PROB_REMNANT", 

64 "PROB_BNS", "PROB_NSBH", "PROB_BBH", 

65 "PROB_TERRES"], 

66 "urls": ["SKYMAP_FITS_URL", "EVENTPAGE_URL"], 

67 "classif_props_burst": ["CENTRAL_FREQ", "DURATION"]}) 

68 

69 # Votable keywords 

70 vo_keys = ({"types": ["Group", "Pipeline", "Search"], 

71 "classif_props_cbc": ["HasNS", "HasRemnant", "BNS", 

72 "NSBH", "BBH", "Terrestrial"], 

73 "urls": ["skymap_fits", "EventPage"], 

74 "classif_props_burst": ["CentralFreq", "Duration"]}) 

75 

76 # FAR 

77 far_notice = float(gcn_notice_mail["FAR"].split()[0]) 

78 match_far = isclose(far_notice, float(params_vo["FAR"]), rel_tol=0.001) 

79 dict_checks['FAR'] = match_far 

80 

81 # Group and pipeline types 

82 for notice_key, vo_key in zip(notice_keys["types"], vo_keys["types"]): 

83 value_notice = gcn_notice_mail[notice_key].split()[2] 

84 match = (value_notice == params_vo[vo_key]) 

85 dict_checks[notice_key] = match 

86 

87 # EventPage/EVENTPAGE_URL and skymap_fits/SKYMAP_FITS_URL 

88 for notice_key, vo_key in zip(notice_keys["urls"], vo_keys["urls"]): 

89 value_notice = gcn_notice_mail[notice_key] 

90 match = (value_notice == params_vo[vo_key]) 

91 dict_checks[notice_key] = match 

92 

93 # CBC classification and properties 

94 if params_vo['Group'] == 'CBC': 

95 for notice_key, vo_key, in zip(notice_keys["classif_props_cbc"], 

96 vo_keys["classif_props_cbc"]): 

97 value_notice = float(gcn_notice_mail[notice_key].split()[0]) 

98 match = isclose(value_notice, float(params_vo[vo_key]), 

99 abs_tol=0.01) 

100 dict_checks[notice_key] = match 

101 

102 # Burst Properties 

103 if params_vo['Group'] == 'Burst': 

104 for notice_key, vo_key in zip(notice_keys["classif_props_burst"], 

105 vo_keys["classif_props_burst"]): 

106 value_notice = float(gcn_notice_mail[notice_key].split()[0]) 

107 match = isclose(value_notice, 

108 float(params_vo[vo_key]), rel_tol=0.001) 

109 dict_checks[notice_key] = match 

110 

111 return dict_checks 

112 

113 

114def _vo_match_comments(gcn_notice_mail, params_vo): 

115 """Check the notice-email comments for the contributed instruments.""" 

116 dict_check_comments = {} 

117 

118 comments_notice_mail = gcn_notice_mail.get_all("COMMENTS") 

119 instruments_vo = params_vo["Instruments"] 

120 

121 text = ' contributed to this candidate event.' 

122 gcn_to_vo_instruments = {'LIGO-Hanford Observatory': 'H1', 

123 'LIGO-Livingston Observatory': 'L1', 

124 'VIRGO Observatory': 'V1'} 

125 

126 instrument_comments = (line.strip() for line in comments_notice_mail) 

127 instruments_gcn = {gcn_to_vo_instruments[line[:-len(text)]] 

128 for line in instrument_comments if line.endswith(text)} 

129 

130 instruments_vo = set(instruments_vo.split(',')) 

131 match_instruments = (instruments_gcn == instruments_vo) 

132 dict_check_comments["INSTRUMENT"] = match_instruments 

133 

134 return dict_check_comments 

135 

136 

137@email_received.connect 

138def on_email_received(rfc822, **kwargs): 

139 """Read the RFC822 email.""" 

140 message = email.message_from_bytes(rfc822, policy=email.policy.default) 

141 validate_text_notice.s(message).delay() 

142 

143 

144@app.task(shared=False) 

145def validate_text_notice(message): 

146 """Validate LIGO/Virgo GCN e-mail notice format. 

147 

148 Check that the contents of a public LIGO/Virgo GCN e-mail notice format 

149 matches the original VOEvent in GraceDB. 

150 

151 """ 

152 # Filter from address and subject 

153 if message['From'] != 'Bacodine <vxw@capella2.gsfc.nasa.gov>': 

154 log.info('Email is not from BACODINE. Subject:%s', message['Subject']) 

155 log.info('Sender is: %s', message['From']) 

156 return 

157 

158 # Write message log 

159 log.info('Validating Notice: Subject:%s', message['Subject']) 

160 

161 # Parse body email 

162 bodymsg = message.get_payload() 

163 notice = email.message_from_string(bodymsg) 

164 

165 # Get notice type 

166 notice_type = notice['NOTICE_TYPE'] 

167 

168 if notice_type.split(" ")[-1] == "Skymap": 

169 notice_type = notice_type.split(" ")[-2] 

170 else: 

171 notice_type = notice_type.split(" ")[-1] 

172 

173 # GCN e-mail notice type for EarlyWarning is Early_Warning 

174 # while we have the ivo://gwnet/LVC#S231030av-1-EarlyWarning 

175 # No underscore in ivo Fix IT 

176 notice_type = notice_type.replace('_', '') 

177 

178 # Get gracedb id and sequence number 

179 trigger_num = notice['TRIGGER_NUM'] 

180 sequence_num = notice['SEQUENCE_NUM'] 

181 

182 # Download VOevent 

183 filename = f'{trigger_num}-{sequence_num}-{notice_type}.xml' 

184 payload = gracedb.download(filename, trigger_num) 

185 

186 # Parse VOevent 

187 root = lxml.etree.fromstring(payload) 

188 

189 params_vo = {elem.attrib['name']: 

190 elem.attrib['value'] 

191 for elem in root.iterfind('.//Param')} 

192 

193 trigger_time_vo = root.findtext('.//ISOTime') 

194 

195 # Match 

196 filename_email = 'email_' + filename.replace('.xml', '.txt') 

197 gracedb.upload.delay(bodymsg, filename_email, trigger_num, 

198 'email notice corresponding to ' + filename, 

199 tags=['em_follow']) 

200 

201 error = None 

202 try: 

203 if notice_type == 'Retraction': 

204 match = _vo_match_notice(notice, params_vo, trigger_time_vo) 

205 elif params_vo['Group'] in ["CBC", "Burst"]: 

206 match = {**_vo_match_notice(notice, params_vo, trigger_time_vo), 

207 **_vo_match_comments(notice, params_vo)} 

208 else: 

209 match = {} 

210 error = f'Email notice {filename} has unknown notice type' 

211 

212 mismatched = ' '.join(key for key, value in match.items() if not value) 

213 if mismatched: 

214 error = \ 

215 f'Email notice {filename} has mismatched keys: {mismatched}' 

216 except KeyError as err: 

217 # Since there was an exeception, the gcn was not annnotated 

218 error = f'Email notice {filename} missing key: {err}' 

219 except Exception as err: 

220 # Since there are other possible exceptions 

221 # we also catch generic error not to stop exection 

222 # and record the exception 

223 error = f'Email notice {filename} generated exception: {err}' 

224 

225 if error: 

226 gracedb.create_tag.delay(filename, 'gcn_email_notok', trigger_num) 

227 gracedb.upload.delay(None, None, trigger_num, 

228 error, tags=['em_follow']) 

229 else: 

230 gracedb.create_tag.delay(filename, 'gcn_email_ok', trigger_num)