217 lines
6.5 KiB
Python
217 lines
6.5 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
from __future__ import print_function
|
|
|
|
from os.path import dirname, basename, abspath
|
|
from itertools import chain
|
|
from datetime import datetime
|
|
import logging
|
|
|
|
import git
|
|
|
|
from elasticsearch import Elasticsearch
|
|
from elasticsearch.helpers import bulk, streaming_bulk
|
|
|
|
|
|
def create_git_index(client, index):
|
|
# create empty index
|
|
client.indices.create(
|
|
index=index,
|
|
body={
|
|
'settings': {
|
|
# just one shard, no replicas for testing
|
|
'number_of_shards': 1,
|
|
'number_of_replicas': 0,
|
|
|
|
# custom analyzer for analyzing file paths
|
|
'analysis': {
|
|
'analyzer': {
|
|
'file_path': {
|
|
'type': 'custom',
|
|
'tokenizer': 'path_hierarchy',
|
|
'filter': ['lowercase']
|
|
}
|
|
}
|
|
}
|
|
}
|
|
},
|
|
# ignore already existing index
|
|
ignore=400
|
|
)
|
|
|
|
# we will use user on several places
|
|
user_mapping = {
|
|
'properties': {
|
|
'name': {
|
|
'type': 'multi_field',
|
|
'fields': {
|
|
'raw': {'type': 'string', 'index': 'not_analyzed'},
|
|
'name': {'type': 'string'}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
client.indices.put_mapping(
|
|
index=index,
|
|
doc_type='repos',
|
|
body={
|
|
'repos': {
|
|
'properties': {
|
|
'owner': user_mapping,
|
|
'created_at': {'type': 'date'},
|
|
'description': {
|
|
'type': 'string',
|
|
'analyzer': 'snowball',
|
|
},
|
|
'tags': {
|
|
'type': 'string',
|
|
'index': 'not_analyzed'
|
|
}
|
|
}
|
|
}
|
|
}
|
|
)
|
|
|
|
client.indices.put_mapping(
|
|
index=index,
|
|
doc_type='commits',
|
|
body={
|
|
'commits': {
|
|
'_parent': {
|
|
'type': 'repos'
|
|
},
|
|
'properties': {
|
|
'author': user_mapping,
|
|
'authored_date': {'type': 'date'},
|
|
'committer': user_mapping,
|
|
'committed_date': {'type': 'date'},
|
|
'parent_shas': {'type': 'string', 'index': 'not_analyzed'},
|
|
'description': {'type': 'string', 'analyzer': 'snowball'},
|
|
'files': {'type': 'string', 'analyzer': 'file_path'}
|
|
}
|
|
}
|
|
}
|
|
)
|
|
|
|
|
|
def parse_commits(repo, name):
|
|
"""
|
|
Go through the git repository log and generate a document per commit
|
|
containing all the metadata.
|
|
"""
|
|
for commit in repo.log():
|
|
yield {
|
|
'_id': commit.id,
|
|
'_parent': name,
|
|
'committed_date': datetime(*commit.committed_date[:6]),
|
|
'committer': {
|
|
'name': commit.committer.name,
|
|
'email': commit.committer.email,
|
|
},
|
|
'authored_date': datetime(*commit.authored_date[:6]),
|
|
'author': {
|
|
'name': commit.author.name,
|
|
'email': commit.author.email,
|
|
},
|
|
'description': commit.message,
|
|
'parent_shas': [p.id for p in commit.parents],
|
|
# we only care about the filenames, not the per-file stats
|
|
'files': list(chain(commit.stats.files)),
|
|
'stats': commit.stats.total,
|
|
}
|
|
|
|
|
|
def load_repo(client, path=None, index='git'):
|
|
"""
|
|
Parse a git repository with all it's commits and load it into elasticsearch
|
|
using `client`. If the index doesn't exist it will be created.
|
|
"""
|
|
path = dirname(dirname(abspath(__file__))) if path is None else path
|
|
repo_name = basename(path)
|
|
repo = git.Repo(path)
|
|
|
|
create_git_index(client, index)
|
|
|
|
# create the parent document in case it doesn't exist
|
|
client.create(
|
|
index=index,
|
|
doc_type='repos',
|
|
id=repo_name,
|
|
body={},
|
|
ignore=409 # 409 - conflict - document is already there
|
|
)
|
|
|
|
# we let the streaming bulk continuously process the commits as they come
|
|
# in - since the `parse_commits` function is a generator this will avoid
|
|
# loading all the commits into memory
|
|
for ok, result in streaming_bulk(
|
|
client,
|
|
parse_commits(repo, repo_name),
|
|
index=index,
|
|
doc_type='commits',
|
|
chunk_size=50):
|
|
action, result = result.popitem()
|
|
doc_id = '/%s/commits/%s' % (index, result['_id'])
|
|
# process the information from ES whether the document has been
|
|
# successfully indexed
|
|
if not ok:
|
|
print('Failed to %s document %s: %r' % (action, doc_id, result))
|
|
else:
|
|
print(doc_id)
|
|
|
|
|
|
# manually create es repo doc and update elasticsearch-py to include metadata
|
|
REPO_ACTIONS = [
|
|
{'_type': 'repos', '_id': 'elasticsearch', '_source': {
|
|
'owner': {'name': 'Shay Bannon', 'email': 'kimchy@gmail.com'},
|
|
'created_at': datetime(2010, 2, 8, 15, 22, 27),
|
|
'tags': ['search', 'distributed', 'lucene'],
|
|
'description': 'You know, for search.'}},
|
|
|
|
{'_type': 'repos', '_id': 'elasticsearch-py', '_op_type': 'update', 'doc': {
|
|
'owner': {'name': 'Honza Král', 'email': 'honza.kral@gmail.com'},
|
|
'created_at': datetime(2013, 5, 1, 16, 37, 32),
|
|
'tags': ['elasticsearch', 'search', 'python', 'client'],
|
|
'description': 'For searching snakes.'}},
|
|
]
|
|
|
|
if __name__ == '__main__':
|
|
# get trace logger and set level
|
|
tracer = logging.getLogger('elasticsearch.trace')
|
|
tracer.setLevel(logging.INFO)
|
|
tracer.addHandler(logging.FileHandler('/tmp/es_trace.log'))
|
|
|
|
# instantiate es client, connects to localhost:9200 by default
|
|
es = Elasticsearch()
|
|
|
|
# we load the repo and all commits
|
|
load_repo(es)
|
|
|
|
# run the bulk operations
|
|
success, _ = bulk(es, REPO_ACTIONS, index='git', raise_on_error=True)
|
|
print('Performed %d actions' % success)
|
|
|
|
# now we can retrieve the documents
|
|
es_repo = es.get(index='git', doc_type='repos', id='elasticsearch')
|
|
print('%s: %s' % (es_repo['_id'], es_repo['_source']['description']))
|
|
|
|
# update - add java to es tags
|
|
es.update(
|
|
index='git',
|
|
doc_type='repos',
|
|
id='elasticsearch',
|
|
body={
|
|
"script": "ctx._source.tags += tag",
|
|
"params": {
|
|
"tag": "java"
|
|
}
|
|
}
|
|
)
|
|
|
|
# refresh to make the documents available for search
|
|
es.indices.refresh(index='git')
|
|
|
|
# and now we can count the documents
|
|
print(es.count(index='git')['count'], 'documents in index')
|