使用python生成大量数据写入es数据库并查询操作(2)

 更新时间:2022年09月22日 10:08:00   作者:IT之一小  
这篇文章主要介绍了使用python生成大量数据写入es数据库并查询操作,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下

前言 :

上一篇文章:如何使用python生成大量数据写入es数据库并查询操作

模拟学生个人信息写入es数据库,包括姓名、性别、年龄、特点、科目、成绩,创建时间。

方案一

在写入数据时未提前创建索引mapping,而是每插入一条数据都包含了索引的信息。

示例代码:【多线程写入数据】【一次性写入10000*1000条数据】  【本人亲测耗时3266秒】

from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime
from queue import Queue
import random
import time
import threading
es = Elasticsearch(hosts='http://127.0.0.1:9200')
# print(es)
 
names = ['刘一', '陈二', '张三', '李四', '王五', '赵六', '孙七', '周八', '吴九', '郑十']
sexs = ['男', '女']
age = [25, 28, 29, 32, 31, 26, 27, 30]
character = ['自信但不自负,不以自我为中心',
             '努力、积极、乐观、拼搏是我的人生信条',
             '抗压能力强,能够快速适应周围环境',
             '敢做敢拼,脚踏实地;做事认真负责,责任心强',
             '爱好所学专业,乐于学习新知识;对工作有责任心;踏实,热情,对生活充满激情',
             '主动性强,自学能力强,具有团队合作意识,有一定组织能力',
             '忠实诚信,讲原则,说到做到,决不推卸责任',
             '有自制力,做事情始终坚持有始有终,从不半途而废',
             '肯学习,有问题不逃避,愿意虚心向他人学习',
             '愿意以谦虚态度赞扬接纳优越者,权威者',
             '会用100%的热情和精力投入到工作中;平易近人',
             '为人诚恳,性格开朗,积极进取,适应力强、勤奋好学、脚踏实地',
             '有较强的团队精神,工作积极进取,态度认真']
subjects = ['语文', '数学', '英语', '生物', '地理']
grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86]
create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
 
def save_to_es(num):
    """
    批量写入数据到es数据库
    :param num:
    :return:
    """
    start = time.time()
    action = [
        {
            "_index": "personal_info_10000000",
            "_type": "doc",
            "_id": i,
            "_source": {
                "id": i,
                "name": random.choice(names),
                "sex": random.choice(sexs),
                "age": random.choice(age),
                "character": random.choice(character),
                "subject": random.choice(subjects),
                "grade": random.choice(grades),
                "create_time": create_time
            }
        } for i in range(10000 * num, 10000 * num + 10000)
    ]
    helpers.bulk(es, action)
    end = time.time()
    print(f"{num}耗时{end - start}s!")
 
def run():
    global queue
    while queue.qsize() > 0:
        num = queue.get()
        print(num)
        save_to_es(num)

if __name__ == '__main__':
    start = time.time()
    queue = Queue()
    # 序号数据进队列
    for num in range(1000):
        queue.put(num)
 
    # 多线程执行程序
    consumer_lst = []
    for _ in range(10):
        thread = threading.Thread(target=run)
        thread.start()
        consumer_lst.append(thread)
    for consumer in consumer_lst:
        consumer.join()
    end = time.time()
    print('程序执行完毕!花费时间:', end - start)

运行结果:

 自动创建的索引mapping:

GET personal_info_10000000/_mapping
{
  "personal_info_10000000" : {
    "mappings" : {
      "properties" : {
        "age" : {
          "type" : "long"
        },
        "character" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "create_time" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "grade" : {
          "type" : "long"
        },
        "id" : {
          "type" : "long"
        },
        "name" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "sex" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "subject" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        }
      }
    }
  }
}

方案二

1.顺序插入5000000条数据

先创建索引personal_info_5000000,确定好mapping后,再插入数据。

新建索引并设置mapping信息:

PUT personal_info_5000000
{
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 1
  },
  "mappings": {
    "properties": {
      "id": {
        "type": "long"
      },
      "name": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 32
          }
        }
      },
      "sex": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 8
          }
        }
      },
      "age": {
        "type": "long"
      },
      "character": {
        "type": "text",
        "analyzer": "ik_smart",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "subject": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "grade": {
        "type": "long"
      },
      "create_time": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
      }
    }
  }
}

查看新建索引信息:

GET personal_info_5000000
 
{
  "personal_info_5000000" : {
    "aliases" : { },
    "mappings" : {
      "properties" : {
        "age" : {
          "type" : "long"
        },
        "character" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          },
          "analyzer" : "ik_smart"
        },
        "create_time" : {
          "type" : "date",
          "format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
        },
        "grade" : {
          "type" : "long"
        },
        "id" : {
          "type" : "long"
        },
        "name" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 32
            }
          }
        },
        "sex" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 8
            }
          }
        },
        "subject" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        }
      }
    },
    "settings" : {
      "index" : {
        "routing" : {
          "allocation" : {
            "include" : {
              "_tier_preference" : "data_content"
            }
          }
        },
        "number_of_shards" : "3",
        "provided_name" : "personal_info_50000000",
        "creation_date" : "1663471072176",
        "number_of_replicas" : "1",
        "uuid" : "5DfmfUhUTJeGk1k4XnN-lQ",
        "version" : {
          "created" : "7170699"
        }
      }
    }
  }
}

开始插入数据:

示例代码: 【单线程写入数据】【一次性写入10000*500条数据】  【本人亲测耗时7916秒】

from elasticsearch import Elasticsearch
from datetime import datetime
from queue import Queue
import random
import time
import threading
es = Elasticsearch(hosts='http://127.0.0.1:9200')
# print(es)
names = ['刘一', '陈二', '张三', '李四', '王五', '赵六', '孙七', '周八', '吴九', '郑十']
sexs = ['男', '女']
age = [25, 28, 29, 32, 31, 26, 27, 30]
character = ['自信但不自负,不以自我为中心',
             '努力、积极、乐观、拼搏是我的人生信条',
             '抗压能力强,能够快速适应周围环境',
             '敢做敢拼,脚踏实地;做事认真负责,责任心强',
             '爱好所学专业,乐于学习新知识;对工作有责任心;踏实,热情,对生活充满激情',
             '主动性强,自学能力强,具有团队合作意识,有一定组织能力',
             '忠实诚信,讲原则,说到做到,决不推卸责任',
             '有自制力,做事情始终坚持有始有终,从不半途而废',
             '肯学习,有问题不逃避,愿意虚心向他人学习',
             '愿意以谦虚态度赞扬接纳优越者,权威者',
             '会用100%的热情和精力投入到工作中;平易近人',
             '为人诚恳,性格开朗,积极进取,适应力强、勤奋好学、脚踏实地',
             '有较强的团队精神,工作积极进取,态度认真']
subjects = ['语文', '数学', '英语', '生物', '地理']
grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86]
create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
 
# 添加程序耗时的功能
def timer(func):
    def wrapper(*args, **kwargs):
        start = time.time()
        res = func(*args, **kwargs)
        end = time.time()
        print('id{}共耗时约 {:.2f} 秒'.format(*args, end - start))
        return res
 
    return wrapper
 
@timer
def save_to_es(num):
    """
    顺序写入数据到es数据库
    :param num:
    :return:
    """
    body = {
        "id": num,
        "name": random.choice(names),
        "sex": random.choice(sexs),
        "age": random.choice(age),
        "character": random.choice(character),
        "subject": random.choice(subjects),
        "grade": random.choice(grades),
        "create_time": create_time
    }
    # 此时若索引不存在时会新建
    es.index(index="personal_info_5000000", id=num, doc_type="_doc", document=body)
 
def run():
    global queue
    while queue.qsize() > 0:
        num = queue.get()
        print(num)
        save_to_es(num)
 
if __name__ == '__main__':
    start = time.time()
    queue = Queue()
    # 序号数据进队列
    for num in range(5000000):
        queue.put(num)
 
    # 多线程执行程序
    consumer_lst = []
    for _ in range(10):
        thread = threading.Thread(target=run)
        thread.start()
        consumer_lst.append(thread)
    for consumer in consumer_lst:
        consumer.join()
    end = time.time()
    print('程序执行完毕!花费时间:', end - start)

运行结果:

2.批量插入5000000条数据

先创建索引personal_info_5000000_v2,确定好mapping后,再插入数据。

新建索引并设置mapping信息:

PUT personal_info_5000000_v2
{
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 1
  },
  "mappings": {
    "properties": {
      "id": {
        "type": "long"
      },
      "name": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 32
          }
        }
      },
      "sex": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 8
          }
        }
      },
      "age": {
        "type": "long"
      },
      "character": {
        "type": "text",
        "analyzer": "ik_smart",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "subject": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "grade": {
        "type": "long"
      },
      "create_time": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
      }
    }
  }
}

查看新建索引信息:

GET personal_info_5000000_v2
 
{
  "personal_info_5000000_v2" : {
    "aliases" : { },
    "mappings" : {
      "properties" : {
        "age" : {
          "type" : "long"
        },
        "character" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          },
          "analyzer" : "ik_smart"
        },
        "create_time" : {
          "type" : "date",
          "format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
        },
        "grade" : {
          "type" : "long"
        },
        "id" : {
          "type" : "long"
        },
        "name" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 32
            }
          }
        },
        "sex" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 8
            }
          }
        },
        "subject" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        }
      }
    },
    "settings" : {
      "index" : {
        "routing" : {
          "allocation" : {
            "include" : {
              "_tier_preference" : "data_content"
            }
          }
        },
        "number_of_shards" : "3",
        "provided_name" : "personal_info_5000000_v2",
        "creation_date" : "1663485323617",
        "number_of_replicas" : "1",
        "uuid" : "XBPaDn_gREmAoJmdRyBMAA",
        "version" : {
          "created" : "7170699"
        }
      }
    }
  }
}

批量插入数据:

通过elasticsearch模块导入helper,通过helper.bulk来批量处理大量的数据。首先将所有的数据定义成字典形式,各字段含义如下:

  • _index对应索引名称,并且该索引必须存在。
  • _type对应类型名称。
  • _source对应的字典内,每一篇文档的字段和值,可有有多个字段。

示例代码:  【程序中途异常,写入4714000条数据】

from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime
from queue import Queue
import random
import time
import threading
es = Elasticsearch(hosts='http://127.0.0.1:9200')
# print(es)
names = ['刘一', '陈二', '张三', '李四', '王五', '赵六', '孙七', '周八', '吴九', '郑十']
sexs = ['男', '女']
age = [25, 28, 29, 32, 31, 26, 27, 30]
character = ['自信但不自负,不以自我为中心',
             '努力、积极、乐观、拼搏是我的人生信条',
             '抗压能力强,能够快速适应周围环境',
             '敢做敢拼,脚踏实地;做事认真负责,责任心强',
             '爱好所学专业,乐于学习新知识;对工作有责任心;踏实,热情,对生活充满激情',
             '主动性强,自学能力强,具有团队合作意识,有一定组织能力',
             '忠实诚信,讲原则,说到做到,决不推卸责任',
             '有自制力,做事情始终坚持有始有终,从不半途而废',
             '肯学习,有问题不逃避,愿意虚心向他人学习',
             '愿意以谦虚态度赞扬接纳优越者,权威者',
             '会用100%的热情和精力投入到工作中;平易近人',
             '为人诚恳,性格开朗,积极进取,适应力强、勤奋好学、脚踏实地',
             '有较强的团队精神,工作积极进取,态度认真']
subjects = ['语文', '数学', '英语', '生物', '地理']
grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86]
create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 添加程序耗时的功能
def timer(func):
    def wrapper(*args, **kwargs):
        start = time.time()
        res = func(*args, **kwargs)
        end = time.time()
        print('id{}共耗时约 {:.2f} 秒'.format(*args, end - start))
        return res
 
    return wrapper
 
 
@timer
def save_to_es(num):
    """
    批量写入数据到es数据库
    :param num:
    :return:
    """
    action = [
        {
            "_index": "personal_info_5000000_v2",
            "_type": "_doc",
            "_id": i,
            "_source": {
                "id": i,
                "name": random.choice(names),
                "sex": random.choice(sexs),
                "age": random.choice(age),
                "character": random.choice(character),
                "subject": random.choice(subjects),
                "grade": random.choice(grades),
                "create_time": create_time
            }
        } for i in range(10000 * num, 10000 * num + 10000)
    ]
    helpers.bulk(es, action)
def run():
    global queue
    while queue.qsize() > 0:
        num = queue.get()
        print(num)
        save_to_es(num)
if __name__ == '__main__':
    start = time.time()
    queue = Queue()
    # 序号数据进队列
    for num in range(500):
        queue.put(num)
 
    # 多线程执行程序
    consumer_lst = []
    for _ in range(10):
        thread = threading.Thread(target=run)
        thread.start()
        consumer_lst.append(thread)
    for consumer in consumer_lst:
        consumer.join()
    end = time.time()
    print('程序执行完毕!花费时间:', end - start)

运行结果:

3.批量插入50000000条数据

先创建索引personal_info_5000000_v2,确定好mapping后,再插入数据。

此过程是在上面批量插入的前提下进行优化,采用python生成器。

建立索引和mapping同上,直接上代码:

示例代码: 【程序中途异常,写入3688000条数据】

from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime
from queue import Queue
import random
import time
import threading
es = Elasticsearch(hosts='http://127.0.0.1:9200')
# print(es)
 
names = ['刘一', '陈二', '张三', '李四', '王五', '赵六', '孙七', '周八', '吴九', '郑十']
sexs = ['男', '女']
age = [25, 28, 29, 32, 31, 26, 27, 30]
character = ['自信但不自负,不以自我为中心',
             '努力、积极、乐观、拼搏是我的人生信条',
             '抗压能力强,能够快速适应周围环境',
             '敢做敢拼,脚踏实地;做事认真负责,责任心强',
             '爱好所学专业,乐于学习新知识;对工作有责任心;踏实,热情,对生活充满激情',
             '主动性强,自学能力强,具有团队合作意识,有一定组织能力',
             '忠实诚信,讲原则,说到做到,决不推卸责任',
             '有自制力,做事情始终坚持有始有终,从不半途而废',
             '肯学习,有问题不逃避,愿意虚心向他人学习',
             '愿意以谦虚态度赞扬接纳优越者,权威者',
             '会用100%的热情和精力投入到工作中;平易近人',
             '为人诚恳,性格开朗,积极进取,适应力强、勤奋好学、脚踏实地',
             '有较强的团队精神,工作积极进取,态度认真']
subjects = ['语文', '数学', '英语', '生物', '地理']
grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86]
create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
 
# 添加程序耗时的功能
def timer(func):
    def wrapper(*args, **kwargs):
        start = time.time()
        res = func(*args, **kwargs)
        end = time.time()
        print('id{}共耗时约 {:.2f} 秒'.format(*args, end - start))
        return res
 
    return wrapper
@timer
def save_to_es(num):
    """
    使用生成器批量写入数据到es数据库
    :param num:
    :return:
    """
    action = (
        {
            "_index": "personal_info_5000000_v3",
            "_type": "_doc",
            "_id": i,
            "_source": {
                "id": i,
                "name": random.choice(names),
                "sex": random.choice(sexs),
                "age": random.choice(age),
                "character": random.choice(character),
                "subject": random.choice(subjects),
                "grade": random.choice(grades),
                "create_time": create_time
            }
        } for i in range(10000 * num, 10000 * num + 10000)
    )
    helpers.bulk(es, action)
 
def run():
    global queue
    while queue.qsize() > 0:
        num = queue.get()
        print(num)
        save_to_es(num)
 
if __name__ == '__main__':
    start = time.time()
    queue = Queue()
    # 序号数据进队列
    for num in range(500):
        queue.put(num)
 
    # 多线程执行程序
    consumer_lst = []
    for _ in range(10):
        thread = threading.Thread(target=run)
        thread.start()
        consumer_lst.append(thread)
    for consumer in consumer_lst:
        consumer.join()
    end = time.time()
    print('程序执行完毕!花费时间:', end - start)

运行结果:

到此这篇关于使用python生成大量数据写入es数据库并查询操作(2)的文章就介绍到这了,更多相关python生成 数据 内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • python实现高斯判别分析算法的例子

    python实现高斯判别分析算法的例子

    今天小编就为大家分享一篇python实现高斯判别分析算法的例子,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-12-12
  • 详解Python prometheus_client使用方式

    详解Python prometheus_client使用方式

    本文主要介绍了Python prometheus_client使用方式,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-02-02
  • python生成每日报表数据(Excel)并邮件发送的实例

    python生成每日报表数据(Excel)并邮件发送的实例

    今天小编就为大家分享一篇python生成每日报表数据(Excel)并邮件发送的实例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-02-02
  • Django REST framework 视图和路由详解

    Django REST framework 视图和路由详解

    这篇文章主要介绍了Django REST framework 视图和路由详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-07-07
  • Python实现将图像转换为ASCII字符图

    Python实现将图像转换为ASCII字符图

    使用Python进行图像处理,非常快捷方便,往往简短几行代码就可以实现功能强大的效果。在这篇文章中,我们将使用Python将图像转换为ASCII字符照,感兴趣的可以了解一下
    2022-08-08
  • pycharm工具连接mysql数据库失败问题

    pycharm工具连接mysql数据库失败问题

    这篇文章主要介绍了pycharm工具连接mysql数据库失败问题及解决方法,非常不错大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-04-04
  • python动态文本进度条的实例代码

    python动态文本进度条的实例代码

    这篇文章主要介绍了python动态文本进度条的实例代码,代码简单易懂,非常不错,具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-01-01
  • python中metaclass原理与用法详解

    python中metaclass原理与用法详解

    这篇文章主要介绍了python中metaclass原理与用法,结合具体实例形式分析了Python中metaclass的功能、原理及使用metaclass动态创建类相关操作技巧,需要的朋友可以参考下
    2019-06-06
  • python encrypt 实现AES加密的实例详解

    python encrypt 实现AES加密的实例详解

    在本篇文章里小编给大家分享的是关于python encrypt 实现AES加密的实例内容,有兴趣的朋友们可以参考下。
    2020-02-02
  • Python之PyQt6对话框的实现

    Python之PyQt6对话框的实现

    这篇文章主要介绍了Python之PyQt6对话框的实现,文章内容详细,简单易懂,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2023-01-01

最新评论