lawsoc-scraper/elasticsearch/index_json_es.py

489 lines
20 KiB
Python

#!/usr/bin/python3
# Import system modules...
# import sys
import os
import time
# import logging
import json
from pyelasticsearch import ElasticSearch
from pyelasticsearch import bulk_chunks
office_mapping = {
"mappings": {
"office": {
"_timestamp": {
"enabled": 'true',
"format": "YYYY-MM-dd",
"default": "1970-01-01"
},
"properties": {
"_all": {
"enabled": 'true',
"index_analyzer": "index_analyzer",
"search_analyzer": "search_analyzer"
},
"location": {
"type": "geo_point"
},
"solicitor_id": {
"type": "string",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
},
"sra_id": {
"type": "string",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
},
"dx_address": {
"type": "string",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
},
"email": {
"type": "string",
"search_analyzer": "email_analyzer",
"index_analyzer": "email_analyzer",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
},
"facilities": {
"type": "string",
"boost": 4.0,
"index_name": "facility",
"index": "analyzed",
"index_analyzer": "index_analyzer",
"search_analyzer": "search_analyzer",
"store": "yes",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
},
"name": {
"type": "string",
"search_analyzer": "name_analyzer",
"index_analyzer": "name_analyzer",
"fields": {
"metaphone": {
"type": "string",
"analyzer": "metaphone_analyzer"
},
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
},
"tel": {
"type": "string",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
},
"web": {
"type": "string",
"search_analyzer": "url_analyzer",
"index_analyzer": "url_analyzer",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
},
"address": {
"type": "string",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
}
}
}
}
}
person_mapping = {
"mappings": {
"person": {
"_timestamp": {
"enabled": "true",
"format": "YYYY-MM-dd",
"default": "1970-01-01"
},
"properties": {
"_all": {
"enabled": 'true',
"index_analyzer": "index_analyzer",
"search_analyzer": "search_analyzer"
},
"admitted_date": {
"type": "date",
"format": "yyyy/MM/dd",
"ignore_missing": True
},
"person_id": {
"type": "integer",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
},
"solicitor_id": {
"type": "integer",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
},
"sra_id": {
"type": "integer",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
},
"type": {
"type": "string",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
},
"roles": {
"type": "string",
"boost": 4.0,
"index_name": "role",
"index": "analyzed",
"index_analyzer": "index_analyzer",
"search_analyzer": "search_analyzer",
"store": "yes",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
},
"name": {
"type": "string",
"search_analyzer": "name_analyzer",
"index_analyzer": "name_analyzer",
"fields": {
"metaphone": {
"type": "string",
"analyzer": "metaphone_analyzer"
},
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
},
"languages": {
"type": "string",
"boost": 4.0,
"index_name": "language",
"index": "analyzed",
"index_analyzer": "index_analyzer",
"search_analyzer": "search_analyzer",
"store": "yes",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
},
"email": {
"type": "string",
"search_analyzer": "email_analyzer",
"index_analyzer": "email_analyzer",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
},
"dx_address": {
"type": "string",
"index": "not_analyzed"
},
"areas_of_practice": {
"type": "string",
"boost": 4.0,
"index_name": "area_of_practice",
"index": "analyzed",
"index_analyzer": "index_analyzer",
"search_analyzer": "search_analyzer",
"store": "yes",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
},
"accreditations": {
"type": "string",
"boost": 4.0,
"index_name": "tag",
"index": "accreditation",
"index_analyzer": "index_analyzer",
"search_analyzer": "search_analyzer",
"store": "yes",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
},
"tel": {
"type": "string",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
}
}
}
}
}
def create_lawsoc_index(client, index):
# create an empty index
settings = {
"settings": {
"index": {
"number_of_replicas": 0,
"number_of_shards": 1,
"analysis": {
"filter": {
"code": {
"type": "pattern_capture",
"preserve_original": 1,
"patterns": [
"(\\p{Ll}+|\\p{Lu}\\p{Ll}+|\\p{Lu}+)",
"(\\d+)"
]
},
"email": {
"type": "pattern_capture",
"preserve_original": 1,
"patterns": [
"([^@]+)",
"(\\p{L}+)",
"(\\d+)",
"@(.+)"
]
},
"phonetic_metaphone": {
"encoder": "metaphone",
"replace": false,
"type": "phonetic"
},
"default_delimiter": {
"type": "word_delimiter",
"generate_word_parts": true,
"generate_number_parts": true,
"catenate_words": false,
"catenate_numbers": false,
"catenate_all": false,
"split_on_case_change": true,
"preserve_original": false,
"split_on_numerics": true,
"stem_english_possessive": true
},
"my_delimiter": {
"type": "word_delimiter",
"generate_word_parts": true,
"generate_number_parts": true,
"catenate_words": false,
"catenate_numbers": false,
"catenate_all": false,
"split_on_case_change": false,
"preserve_original": true,
"split_on_numerics": false,
"stem_english_possessive": false
}
},
"analyzer": {
"index_analyzer": {
"tokenizer": "standard",
"filter": [
"standard",
"lowercase",
"stop",
"asciifolding",
"phonetic_metaphone",
"my_delimiter"
]
},
"search_analyzer": {
"tokenizer": "standard",
"filter": [
"standard",
"lowercase",
"stop",
"asciifolding",
"phonetic_metaphone"
]
},
"name_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": [
"asciifolding",
"lowercase"
]
},
"email_analyzer": {
"type": "custom",
"tokenizer": "uax_url_email",
"filter": [
"email",
"lowercase",
"unique"
]
},
"url_analyzer": {
"type": "custom",
"tokenizer": "uax_url_email"
},
"metaphone_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": [
"phonetic_metaphone"
]
}
}
}
}
}
}
client.indices.create(
index=index,
ignore=409,
body=settings
)
def load_lawsoc(client, path=None, index='lawsoc', document_type='office', document_id=1):
# create the parent document in case it doesn't exist
client.create(
index=index,
doc_type=document_type,
id=document_id,
body={},
ignore=409 # 409 - conflict - would be returned if the document is already there
)
create_lawsoc_index(client, index)
def documents(workspace, fileList):
print("Entering documents function")
print("length of fileList: " + str(len(fileList)))
# for file in fileList:
for index, file in enumerate(fileList):
print(index, '--->', file)
# For each file in the list, read its json and provide a generator
contents = json.load(open(workspace + file))
yield es.index_op(contents)
# set some variables
# Set the workspace...
# workspace_home = '/var/tmp/lawsoc_new/'
workspace_home = os.getcwd()
workspace = workspace_home + "/json_out/"
office_workspace = workspace + "office/"
person_workspace = workspace + "person/"
log_workspace = workspace + "es_trace.log"
office_document_type = "office_detail"
person_document_type = "person_detail"
index_date = time.strftime("%Y-%m-%d")
es_index_name = "lawsoc_" + index_date
if __name__ == '__main__':
# logging.basicConfig()
# tracer = logging.getLogger('elasticsearch.trace').setLevel(logging.INFO)
# tracer.setLevel(logging.INFO)
# tracer.addHandler(logging.FileHandler(log_workspace))
# Create the ES connection object...
es = ElasticSearch('http://localhost:9200/')
# Get a list of files in the workspace...
office_fileList = os.listdir(office_workspace)
person_fileList = os.listdir(person_workspace)
# Run through the office loop...
try:
for chunk in bulk_chunks(documents(
office_workspace,
office_fileList),
docs_per_chunk=100,
bytes_per_chunk=10000
):
es.bulk(chunk, doc_type=office_document_type, index=es_index_name)
except Exception as e:
print(e)
print("Finished office json files.")
time.sleep(5)
# Run through the person loop...
try:
for chunk in bulk_chunks(
documents(
person_workspace,
person_fileList),
docs_per_chunk=100,
bytes_per_chunk=10000
):
es.bulk(chunk, doc_type=person_document_type, index=es_index_name)
except Exception as e:
print(e)