python3实现ftp服务功能(服务端 For Linux)

 更新时间:2017年03月24日 16:56:16   投稿:lijiao  
这篇文章主要介绍了python3实现ftp服务功能,服务端 For Linux,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

本文实例为大家分享了python3实现ftp服务功能的具体代码,供大家参考,具体内容如下

功能介绍:

可执行的命令:

ls
pwd
cd
put
rm
get
mkdir

1、用户加密认证

2、允许多用户同时登陆

3、每个用户有自己的家目录,且只可以访问自己的家目录

4、运行在自己家目录下随意切换目录

5、允许上传下载文件,且文件一致

6、传输过程中显示进度条
server main 代码:

# Author by Andy
# _*_ coding:utf-8 _*_
import os, sys, json, hashlib, socketserver, time

base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(base_dir)
from conf import userdb_set
class Ftp_server(socketserver.BaseRequestHandler):
 user_home_dir = ''
 def auth(self, *args):
  '''验证用户名及密码'''
  cmd_dic = args[0]
  username = cmd_dic["username"]
  password = cmd_dic["password"]
  f = open(userdb_set.userdb_set(), 'r')
  user_info = json.load(f)
  if username in user_info.keys():
   if password == user_info[username]:
    self.request.send('0'.encode())
    os.chdir('/home/%s' % username)
    self.user_home_dir = os.popen('pwd').read().strip()
    data = "%s login successed" % username
    self.loging(data)
   else:
    self.request.send('1'.encode())
    data = "%s login failed" % username
    self.loging(data)
    f.close
  else:
   self.request.send('1'.encode())
   data = "%s login failed" % username
   self.loging(data)
   f.close
   ##########################################

 def get(self, *args):
  '''给客户端传输文件'''
  request_code = {
   '0': 'file is ready to get',
   '1': 'file not found!'
  }
  cmd_dic = args[0]
  self.loging(json.dumps(cmd_dic))
  filename = cmd_dic["filename"]
  if os.path.isfile(filename):
   self.request.send('0'.encode('utf-8')) # 确认文件存在
   self.request.recv(1024)
   self.request.send(str(os.stat(filename).st_size).encode('utf-8'))
   self.request.recv(1024)
   m = hashlib.md5()
   f = open(filename, 'rb')
   for line in f:
    m.update(line)
    self.request.send(line)
   self.request.send(m.hexdigest().encode('utf-8'))
   print('From server:Md5 value has been sended!')
   f.close()
  else:
   self.request.send('1'.encode('utf-8'))
   ###########################################

 def cd(self, *args):
  '''执行cd命令'''
  user_current_dir = os.popen('pwd').read().strip()
  cmd_dic = args[0]
  self.loging(json.dumps(cmd_dic))
  path = cmd_dic['path']
  if path.startswith('/'):
   if self.user_home_dir in path:
    os.chdir(path)
    new_dir = os.popen('pwd').read()
    user_current_dir = new_dir
    self.request.send('Change dir successfully!'.encode("utf-8"))
    data = 'Change dir successfully!'
    self.loging(data)
   elif os.path.exists(path):
    self.request.send('Permission Denied!'.encode("utf-8"))
    data = 'Permission Denied!'
    self.loging(data)
   else:
    self.request.send('Directory not found!'.encode("utf-8"))
    data = 'Directory not found!'
    self.loging(data)
  elif os.path.exists(path):
   os.chdir(path)
   new_dir = os.popen('pwd').read().strip()
   if self.user_home_dir in new_dir:
    self.request.send('Change dir successfully!'.encode("utf-8"))
    user_current_dir = new_dir
    data = 'Change dir successfully!'
    self.loging(data)
   else:
    os.chdir(user_current_dir)
    self.request.send('Permission Denied!'.encode("utf-8"))
    data = 'Permission Denied!'
    self.loging(data)
  else:
   self.request.send('Directory not found!'.encode("utf-8"))
   data = 'Directory not found!'
   self.loging(data)
   ###########################################

 def rm(self, *args):
  request_code = {
   '0': 'file exist,and Please confirm whether to rm',
   '1': 'file not found!'
  }
  cmd_dic = args[0]
  self.loging(json.dumps(cmd_dic))
  filename = cmd_dic['filename']
  if os.path.exists(filename):
   self.request.send('0'.encode("utf-8")) # 确认文件存在
   client_response = self.request.recv(1024).decode()
   if client_response == '0':
    os.popen('rm -rf %s' % filename)
    self.request.send(('File %s has been deleted!' % filename).encode("utf-8"))
    self.loging('File %s has been deleted!' % filename)
   else:
    self.request.send(('File %s not deleted!' % filename).encode("utf-8"))
    self.loging('File %s not deleted!' % filename)
  else:
   self.request.send('1'.encode("utf-8"))
   ########################################

 def pwd(self, *args):
  '''执行pwd命令'''
  cmd_dic = args[0]
  self.loging(json.dumps(cmd_dic))
  server_response = os.popen('pwd').read().strip().encode("utf-8")
  self.request.send(server_response)

 #############################################
 def ls(self, *args):
  '''执行ls命名'''
  cmd_dic = args[0]
  self.loging(json.dumps(cmd_dic))
  path = cmd_dic['path']
  cmd = 'ls -l %s' % path
  server_response = os.popen(cmd).read().encode("utf-8")
  self.request.send(server_response)

 ############################################
 def put(self, *args):
  '''接收客户端文件'''
  cmd_dic = args[0]
  self.loging(json.dumps(cmd_dic))
  filename = cmd_dic["filename"]
  filesize = cmd_dic["size"]
  if os.path.isfile(filename):
   f = open(filename + '.new', 'wb')
  else:
   f = open(filename, 'wb')
  request_code = {
   '200': 'Ready to recceive data!',
   '210': 'Not ready to received data!'
  }
  self.request.send('200'.encode())
  receive_size = 0
  while True:
   if receive_size < filesize:
    data = self.request.recv(1024)
    f.write(data)
    receive_size += len(data)
   else:
    data = "File %s has been uploaded successfully!" % filename
    self.loging(data)
    print(data)
    break

    ################################################

 def mkdir(self, *args):
  request_code = {
   '0': 'Directory has been made!',
   '1': 'Directory is aleady exist!'
  }
  cmd_dic = args[0]
  self.loging(json.dumps(cmd_dic))
  dir_name = cmd_dic['dir_name']
  if os.path.exists(dir_name):
   self.request.send('1'.encode("utf-8"))
  else:
   os.popen('mkdir %s' % dir_name)
   self.request.send('0'.encode("utf-8"))

   #############################################

  def loging(self, data):
  '''日志记录'''
  localtime = time.asctime(time.localtime(time.time()))
  log_file = '/root/ftp/ftpserver/log/server.log'
  with open(log_file, 'a', encoding='utf-8') as f:
   f.write('%s-->' % localtime + data + '\n')
   ##############################################

 def handle(self):
  # print("您本次访问使用的IP为:%s" %self.client_address[0])
  # localtime = time.asctime( time.localtime(time.time()))
  # print(localtime)

  while True:
   try:
    self.data = self.request.recv(1024).decode() #
    # print(self.data)
    cmd_dic = json.loads(self.data)
    action = cmd_dic["action"]
    # print("用户请求%s"%action)
    if hasattr(self, action):
     func = getattr(self, action)
     func(cmd_dic)
   except Exception as e:
    self.loging(str(e))
    break

def run():
 HOST, PORT = '0.0.0.0', 6969
 print("The server is started,and listenning at port 6969")
 server = socketserver.ThreadingTCPServer((HOST, PORT), Ftp_server)
 server.serve_forever()
if __name__ == '__main__':
 run()


设置用户口令代码:

#Author by Andy
#_*_ coding:utf-8 _*_
import os,json,hashlib,sys

base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
userdb_file = base_dir+"\data\\userdb"
# print(userdb_file)
def userdb_set():
 if os.path.isfile(userdb_file):
  # print(userdb_file)
  return userdb_file
 else:
  print('请先为您的服务器创建用户!')
  user_data = {}
  dict={}
  Exit_flags = True
  while Exit_flags:
   username = input("Please input username:")
   if username != 'exit':
    password = input("Please input passwod:")
    if password != 'exit':
      user_data.update({username:password})
      m = hashlib.md5()
      # m.update('hello')
      # print(m.hexdigest())
      for i in user_data:
       # print(i,user_data[i])
       m.update(user_data[i].encode())
       dict.update({i:m.hexdigest()})
    else:
     break
   else:
    break
  f = open(userdb_file,'w')
  json.dump(dict,f)
  f.close()
 return userdb_file

目录结构:

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

相关文章

  • 如何使用Python实现CartPole游戏

    如何使用Python实现CartPole游戏

    在深度强化学习内容的介绍中,提出了CartPole游戏进行深度强化学习,现在提供一种用Python简单实现Cart Pole游戏的方法,感兴趣的朋友跟随小编一起看看吧
    2024-07-07
  • Python制作CSDN免积分下载器

    Python制作CSDN免积分下载器

    本文给大家分享的是使用python实现的CSDN的免积分下载器,具体干嘛的,我相信你懂的~~~有需要的小伙伴自己来看看哈。
    2015-03-03
  • python题解LeetCode303区域和检索示例详解

    python题解LeetCode303区域和检索示例详解

    这篇文章主要为大家介绍了python题解LeetCode303区域和检索示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-12-12
  • pytorch 输出中间层特征的实例

    pytorch 输出中间层特征的实例

    今天小编就为大家分享一篇pytorch 输出中间层特征的实例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-08-08
  • Python实现判断变量是否是函数方式

    Python实现判断变量是否是函数方式

    这篇文章主要介绍了Python实现判断变量是否是函数方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-02-02
  • python pprint模块中print()和pprint()两者的区别

    python pprint模块中print()和pprint()两者的区别

    这篇文章主要介绍了python pprint模块中print()和pprint()两者的区别,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-02-02
  • python 用for循环实现1~n求和的实例

    python 用for循环实现1~n求和的实例

    今天小编就为大家分享一篇python 用for循环实现1~n求和的实例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-02-02
  • 简单说明Python中的装饰器的用法

    简单说明Python中的装饰器的用法

    这篇文章主要简单说明了Python中的装饰器的用法,装饰器在Python的进阶学习中非常重要,示例代码基于Python2.x,需要的朋友可以参考下
    2015-04-04
  • 13个有趣又好玩的Python游戏代码分享

    13个有趣又好玩的Python游戏代码分享

    今天小编跟大家分享13个有趣又好玩的Python小游戏示例代码,教你如何通过边打游戏边学编程!感兴趣的小伙伴快跟随小编一起学习起来
    2022-02-02
  • 关于python导入模块import与常见的模块详解

    关于python导入模块import与常见的模块详解

    今天小编就为大家分享一篇关于python导入模块import与常见的模块详解,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-08-08

最新评论