python实现封装得到virustotal扫描结果
更新时间:2014年10月05日 15:25:34 投稿:shichen2014
这篇文章主要介绍了python实现封装得到virustotal扫描结果的方法,是比较实用的技巧,可将扫描结果写入数据库,需要的朋友可以参考下
本文实例讲述了python实现封装得到virustotal扫描结果的方法。分享给大家供大家参考。具体方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 | import simplejson import urllib import urllib2 import os, sys import logging try : import sqlite3 except ImportError: sys.stderr.write( "ERROR: Unable to locate Python SQLite3 module. " \ "Please verify your installation. Exiting...\n" ) sys.exit( - 1 ) MD5 = "5248f774d2ee0a10936d0b1dc89107f1" MD5 = "12fa5fb74201d9b6a14f63fbf9a81ff6" #do not have report on virustotal.com APIKEY = "xxxxxxxxxxxxxxxxxx" 用自己的 class VirusTotalDatabase: """ Database abstraction layer. """ def __init__( self , db_file): log = logging.getLogger( "Database.Init" ) self .__dbfile = db_file self ._conn = None self ._cursor = None # Check if SQLite database already exists. If it doesn't exist I invoke # the generation procedure. if not os.path.exists( self .__dbfile): if self ._generate(): print ( "Generated database \"%s\" which didn't" \ " exist before." % self .__dbfile) else : print ( "Unable to generate database" ) # Once the database is generated of it already has been, I can # initialize the connection. try : self ._conn = sqlite3.connect( self .__dbfile) self ._cursor = self ._conn.cursor() except Exception, why: print ( "Unable to connect to database \"%s\": %s." % ( self .__dbfile, why)) log.debug( "Connected to SQLite database \"%s\"." % self .__dbfile) def _generate( self ): """ Creates database structure in a SQLite file. """ if os.path.exists( self .__dbfile): return False db_dir = os.path.dirname( self .__dbfile) if not os.path.exists(db_dir): try : os.makedirs(db_dir) except (IOError, os.error), why: print ( "Something went wrong while creating database " \ "directory \"%s\": %s" % (db_dir, why)) return False conn = sqlite3.connect( self .__dbfile) cursor = conn.cursor() cursor.execute( "CREATE TABLE virustotal (\n" \ " id INTEGER PRIMARY KEY,\n" \ " md5 TEXT NOT NULL,\n" \ " Kaspersky TEXT DEFAULT NULL,\n" \ " McAfee TEXT DEFAULT NULL,\n" \ " Symantec TEXT DEFAULT NULL,\n" \ " Norman TEXT DEFAULT NULL,\n" \ " Avast TEXT DEFAULT NULL,\n" \ " NOD32 TEXT DEFAULT NULL,\n" \ " BitDefender TEXT DEFAULT NULL,\n" \ " Microsoft TEXT DEFAULT NULL,\n" \ " Rising TEXT DEFAULT NULL,\n" \ " Panda TEXT DEFAULT NULL\n" \ ");" ) print "create db:%s sucess" % self .__dbfile return True def _get_task_dict( self , row): try : task = {} task[ "id" ] = row[ 0 ] task[ "md5" ] = row[ 1 ] task[ "Kaspersky" ] = row[ 2 ] task[ "McAfee" ] = row[ 3 ] task[ "Symantec" ] = row[ 4 ] task[ "Norman" ] = row[ 5 ] task[ "Avast" ] = row[ 6 ] task[ "NOD32" ] = row[ 7 ] task[ "BitDefender" ] = row[ 8 ] task[ "Microsoft" ] = row[ 9 ] task[ "Rising" ] = row[ 10 ] task[ "Panda" ] = row[ 11 ] return task except Exception, why: return None def add_sample( self , md5, virus_dict): """ """ task_id = None if not self ._cursor: return None if not md5 or md5 = = "": return None Kaspersky = virus_dict.get( "Kaspersky" , None ) McAfee = virus_dict.get( "McAfee" , None ) Symantec = virus_dict.get( "Symantec" , None ) Norman = virus_dict.get( "Norman" , None ) Avast = virus_dict.get( "Avast" , None ) NOD32 = virus_dict.get( "NOD32" , None ) BitDefender = virus_dict.get( "BitDefender" , None ) Microsoft = virus_dict.get( "Microsoft" , None ) Rising = virus_dict.get( "Rising" , None ) Panda = virus_dict.get( "Panda" , None ) self ._conn.text_factory = str try : self ._cursor.execute( "SELECT id FROM virustotal WHERE md5 = ?;" , (md5,)) sample_row = self ._cursor.fetchone() except sqlite3.OperationalError, why: print "sqlite3 error:%s\n" % str (why) return False if sample_row: try : sample_row = sample_row[ 0 ] self ._cursor.execute("UPDATE virustotal SET Kaspersky = ?, McAfee = ?, Symantec = ?, Norman = ?, Avast = ?, \ NOD32 = ?, BitDefender = ?, Microsoft = ?, Rising = ?, Panda = ? WHERE id = ?;", (Kaspersky, McAfee, Symantec, Norman, Avast, NOD32, BitDefender, Microsoft,\ Rising, Panda, sample_row)) self ._conn.commit() task_id = sample_row except sqlite3.OperationalError, why: print ( "Unable to update database: %s." % why) return False else : #the sample not in the database try : self ._cursor.execute( "INSERT INTO virustotal " \ "(md5, Kaspersky, McAfee, Symantec, Norman, Avast, NOD32, BitDefender,\ Microsoft, Rising, Panda) " \ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);" , (md5, Kaspersky, McAfee, Symantec, Norman, Avast, NOD32, BitDefender,\ Microsoft, Rising, Panda)) self ._conn.commit() task_id = self ._cursor.lastrowid except sqlite3.OperationalError, why: print "why" , str (why) return None print "add_to_db:%s, task_id:%s" % ( str ( self .__dbfile), str (task_id)) return task_id def get_sample( self ): """ Gets a task from pending queue. """ log = logging.getLogger( "Database.GetTask" ) if not self ._cursor: log.error( "Unable to acquire cursor." ) return None # Select one item from the queue table with higher priority and older # addition date which has not already been processed. try : self ._cursor.execute( "SELECT * FROM virustotal " \ #"WHERE lock = 0 " \ #"AND status = 0 " \ "ORDER BY id, added_on LIMIT 1;" ) except sqlite3.OperationalError, why: log.error( "Unable to query database: %s." % why) return None sample_row = self ._cursor.fetchone() if sample_row: return self ._get_task_dict(sample_row) else : return None def search_md5( self , md5): """ """ if not self ._cursor: return None if not md5 or len (md5) ! = 32 : return None try : self ._cursor.execute( "SELECT * FROM virustotal " \ "WHERE md5 = ? " \ #"AND status = 1 " \ "ORDER BY id DESC;" , (md5,)) except sqlite3.OperationalError, why: return None task_dict = {} for row in self ._cursor.fetchall(): task_dict = self ._get_task_dict(row) #if task_dict: #tasks.append(task_dict) return task_dict class VirusTotal: """""" def __init__( self , md5): """Constructor""" self ._virus_dict = {} self ._md5 = md5 self ._db_file = r "./db/virustotal.db" self .get_report_dict() def repr ( self ): return str ( self ._virus_dict) def submit_md5( self , file_path): import postfile #submit the file FILE_NAME = os.path.basename(file_path) host = "www.virustotal.com" selector = "https://www.virustotal.com/vtapi/v2/file/scan" fields = [( "apikey" , APIKEY)] file_to_send = open (file_path, "rb" ).read() files = [( "file" , FILE_NAME, file_to_send)] json = postfile.post_multipart(host, selector, fields, files) print json pass def get_report_dict( self ): result_dict = {} url = "https://www.virustotal.com/vtapi/v2/file/report" parameters = { "resource" : self ._md5, "apikey" : APIKEY} data = urllib.urlencode(parameters) req = urllib2.Request(url, data) response = urllib2.urlopen(req) json = response.read() response_dict = simplejson.loads(json) if response_dict[ "response_code" ]: #has result scans_dict = response_dict.get( "scans" , {}) for anti_virus_comany, virus_name in scans_dict.iteritems(): if virus_name[ "detected" ]: result_dict.setdefault(anti_virus_comany, virus_name[ "result" ]) return result_dict def write_to_db( self ): """""" db = VirusTotalDatabase( self ._db_file) virus_dict = self .get_report_dict() db.add_sample( self ._md5, virus_dict) |
使用方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | config = { 'input' : "inputMd5s" } fp = open (config[ 'input' ], "r" ) content = fp.readlines() MD5S = [] for md5 in ifilter( lambda x: len (x)> 0 , imap(string.strip, content)): MD5S.append(md5) print "MD5S" ,MD5S fp.close() from getVirusTotalInfo import VirusTotal #得到扫描结果并写入数库 for md5 in MD5S: virus_total = VirusTotal(md5) virus_total.write_to_db() |
希望本文所述对大家的Python程序设计有所帮助。
微信公众号搜索 “ 脚本之家 ” ,选择关注
程序猿的那些事、送书等活动等着你
相关文章
Tensorflow卷积实现原理+手写python代码实现卷积教程
这篇文章主要介绍了Tensorflow卷积实现原理+手写python代码实现卷积教程,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧2020-05-05python DataFrame数据分组统计groupby()函数的使用
在python的DataFrame中对数据进行分组统计主要使用groupby()函数,本文主要介绍了python DataFrame数据分组统计groupby()函数的使用,具有一定的参考价值,感兴趣的可以了解一下2022-03-03
最新评论