#!/usr/bin/env python # Copyright 2003, 2004 Petru Paler (petru@paler.net) # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA from socket import socket, timeout, inet_aton, AF_INET, SOCK_DGRAM, AF_INET6, inet_pton, gethostbyname from struct import pack, unpack from random import randint import DNS # http://pydns.sourceforge.net DNS.ParseResolvConf() SIQ_SERVER = "db.outboundindex.net" SIQ_PORT = 6264 SIQ_TIMEOUT = 10 SIQ_RETRIES = 10 # Currently using version draft 18 SIQ_VERSION = 1 SIQD_UNKNOWN = -1 SIQD_TEMPFAIL = -2 SIQD_REDIRECT = -3 M_HL = 128 M_ML = 64 def _get_qid(): return randint(0, 65535) def response(score, text, hl=False, ml=False, ipscore=-1, domscore=-1, relscore=-1): return (score, text, ipscore, domscore, relscore, hl, ml) def listEndsWith(l1, l2): if len(l2) > len(l1): return False l1 = l1[:] l2 = l2[:] l1.reverse(); l2.reverse() for i,x in enumerate(l2): if not l1[i] == x: return False return True def lookupAddresses(h): req = DNS.DnsRequest(name=h, qtype='A') try: a = req.req() except: return [] return [x['data'] for x in a.answers if x['typename'] == 'A'] def siq_query(domain, ip, server=SIQ_SERVER, helodom=None): """ Return (score, text, ip_score, domain_score, relationship_score, hl, ml). """ qid = _get_qid() ## Construct the query. flags = 0 hl = ml = False if helodom: # Check HL and ML if listEndsWith(helodom.lower().split('.'), domain.lower().split('.')): flags = flags | M_ML ml = True if ip in lookupAddresses(helodom): flags = flags | M_HL hl = True payload = chr(SIQ_VERSION) + chr(flags) payload += pack('!H', qid) # see RFC 2373 section 2.5.4 for desc of next line payload += '\0'*12 + inet_pton(AF_INET, ip) payload += chr(len(domain)) payload += chr(0) # rdomain payload += domain # qdomain ## Send the query. s = socket(AF_INET, SOCK_DGRAM) s.sendto(payload, (server, SIQ_PORT)) s.settimeout(SIQ_TIMEOUT) ## Receive the response. retrycount = 0 while retrycount < SIQ_RETRIES: try: r = s.recvfrom(255)[0] break except timeout: retrycount += 1 if retrycount == SIQ_RETRIES: return response(SIQD_UNKNOWN, "Timed out (tried %d times)" % (retrycount,)) if len(r) == 0: return response(SIQD_UNKNOWN, "Empty server response") ## Parse and check the response. ver = ord(r[0]) if ver != SIQ_VERSION: return response(SIQD_UNKNOWN, "Version mismatch (got %d, expected %d)" % (ver, SIQ_VERSION)) score = unpack('!b', r[1])[0] if not (-3 <= score <= 100): return response(SIQD_UNKNOWN, "Undefined score (got %r)" % (score,)) rqid = unpack('!H', r[2:4])[0] if rqid != qid: return response(SIQD_UNKNOWN, "Query id mismatch (got %d, expected %d)" % (rqid, qid)) secondary_scores = [] for name, i in [('ip', 4), ('domain', 5), ('relationship', 6)]: secondary_score = unpack('!b', r[i])[0] secondary_scores.append(secondary_score) if not (-1 <= secondary_score <= 100): return response(SIQD_UNKNOWN, "Undefined %s score (got %d)" % (name, secondary_score)) tlen = ord(r[7]) rtxt = r[8:] if tlen != len(rtxt): return response(SIQD_UNKNOWN, "Wrong length of domain sent (sent %d, actual %d)" % (tlen, len(rtxt))) return response(score, rtxt, hl, ml, *secondary_scores) def _test(domain, ip, server, helodom=None): if server is None: server = SIQ_SERVER print "Checking %s from %s (server %s)" % (domain, ip, server) result = siq_query(domain, ip, server, helodom) print "score=%d, text=%s, ipscore=%d, domscore=%d, relscore=%d, hl=%s, ml=%s" % result if __name__ == "__main__": from optparse import OptionParser usage = "Usage: %prog [-h] [-s ] " parser = OptionParser(usage) parser.add_option("-s", "--server", type="string", dest="server", help="SIQ server to use") parser.add_option("-d", "--helodom", type="string", dest="helodom", help="HELO domain") (options, args) = parser.parse_args() if len(args) != 2: parser.error("incorrect number of arguments") _test(args[0], args[1], options.server, options.helodom)