elasticsearch索引创建create index集群matedata更新

网友投稿 676 2022-10-10 22:00:13

elasticsearch索引创建create index集群matedata更新

elasticsearch索引创建create index集群matedata更新

目录创建索引更新集群index matedata首先创建index的create方法从indice中获取对应的IndexService总结

创建索引更新集群index matedata

创建索引需要创建索引并且更新集群index matedata,这一过程在MetaDataCreateIndexService的createIndex方法中完成。这里会提交一个高优先级,AckedClusterStateUpdateTask类型的task。索引创建需要即时得到反馈,异常这个task需要返回,会超时,而且这个任务的优先级也非常高。

下面具体看一下它的execute方法,这个方法会在master执行任务时调用,这个方法非常长,主要完成以下三个功能:更新合并request,template中的mapping和setting,调用indiceService创建索引,对创建后的索引添加mapping。这一系列功能完成后,合并完成后生成新的matedata,并更新集群状态,完成了索引的创建。具体的调用方法参考上一篇。

首先创建index的create方法

代码如下所示:

@Override

public ClusterState execute(ClusterState currentState) throws Exception {

boolean indexCreated = false;

String removalReason = null;

try {

            //检查request的合法性,1.5版本主要检查index名字是否合法,如不能含有某些字符,另外就是集群节点版本

validate(request, currentState);

for (Alias alias : request.aliases()) {//检查别称是否合法

aliasValidator.validateAlias(alias, request.index(), currentState.metaData());

}

// 查找索引模板

List<IndexTemplateMetaData> templates = findTemplates(request, currentState, indexTemplateFilter);

Map<String, Custom> customs = Maps.newHashMap();

// add the request mapping

Map<String, Map<String, Object>http://> mappings = Maps.newHashMap();

Map<String, AliasMetaData> templatesAliases = Maps.newHashMap();

List<String> templateNames = Lists.newArrayList();

            //取出request中的mapping配置,虽然mapping可以后面添加,多数情况创建索引的时候还是会附带着mapping,在request中mapping是一个map

for (Map.Entry<String, String> entry : request.mappings().entrySet()) {

mappings.put(entry.getKey(), parseMapping(entry.getValue()));

}

            //一些预设如warm等

for (Map.Entry<String, Custom> entry : request.customs().entrySet()) {

customs.put(entry.getKey(), entry.getValue());

}

// 将找到的template和request中的mapping合并

for (IndexTemplateMetaData template : templates) {

templateNames.add(template.getName());

for (ObjectObjectCursor<String, CompressedString> cursor : template.mappings()) {

if (mappings.containsKey(cursor.key)) {

XContentHelper.mergeDefaults(mappings.get(cursor.key), parseMapping(cursor.value.string()));

} else {

mappings.put(cursor.key, parseMapping(cursor.value.string()));

}

}

// 合并custom

for (ObjectObjectCursor<String, Custom> cursor : template.customs()) {

String type = cursor.key;

IndexMetaData.Custom custom = cursor.value;

IndexMetaData.Custom existing = customs.get(type);

if (existing == null) {

customs.put(type, custom);

} else {

IndexMetaData.Custom merged = IndexMetaData.lookupFactorySafe(type).merge(existing, custom);

customs.put(type, merged);

}

}

//处理合并别名

for (ObjectObjectCursor<String, AliasMetaData> cursor : template.aliases()) {

AliasMetaData aliasMetaData = cursor.value;

//if an alias with same name came with the create index request itself,

// ignore this one taken from the index template

if (request.aliases().contains(new Alias(aliasMetaData.alias()))) {

continue;

}

//if an alias with same name was already processed, ignore this one

if (templatesAliases.containsKey(cursor.key)) {

continue;

}

//Allow templatesAliases to be templated by replacing a token with the name of the index that we are applying it to

if (aliasMetaData.alias().contains("{index}")) {

String templatedAlias = aliasMetaData.alias().replace("{index}", request.index());

aliasMetaData = AliasMetaData.newAliasMetaData(aliasMetaData, templatedAlias);

}

aliasValidator.validateAliasMetaData(aliasMetaData, request.index(), currentState.metaData());

templatesAliases.put(aliasMetaData.alias(), aliasMetaData);

}

}

// 合并完template和request,现在开始处理配置基本的mapping,合并逻辑跟之前相同,只是mapping来源不同

File mappingsDir = new File(environment.configFile(), "mappings");

if (mappingsDir.isDirectory()) {

// first index level

File indexMappingsDir = new File(mappingsDir, request.index());

if (indexMappingsDir.isDirectory()) {

addMappings(mappings, indexMappingsDir);

}

// second is the _default mapping

File defaultMappingsDir = new File(mappingsDir, "_default");

if (defaultMappingsDir.isDirectory()) {

addMappings(mappings, defaultMappingsDir);

}

}

            //处理index的配置(setting)

ImmutableSettings.Builder indexSettingsBuilder = settingsBuilder();

//加入模板中的setting

for (int i = templates.size() - 1; i >= 0; i--) {

indexSettingsBuilder.put(templates.get(i).settings());

}

// 加入request中的mapping,request中设置会覆盖模板中的设置

indexSettingsBuilder.put(request.settings());

            //处理shard,shard数量不能小于1,因此这里需要特殊处理,如果没有则要使用默认值

if (request.index().equals(ScriptService.SCRIPT_INDEX)) {

indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 1));

} else {

if (indexSettingsBuilder.get(SETTING_NUMBER_OF_SHARDS) == null) {

http:// if (request.index().equals(riverIndexName)) {

indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 1));

} else {

indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5));

}

}

}

if (request.index().equals(ScriptService.SCRIPT_INDEX)) {

indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 0));

indexSettingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS, "0-all");

}

else {

if (indexSettingsBuilder.get(SETTING_NUMBER_OF_REPLICAS) == null) {

if (request.index().equals(riverIndexName)) {

indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1));

} else {

indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1));

}

}

}

            //处理副本

if (settings.get(SETTING_AUTO_EXPAND_REPLICAS) != null && indexSettingsBuilder.get(SETTING_AUTO_EXPAND_REPLICAS) == null) {

indexSettingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS, settings.get(SETTING_AUTO_EXPAND_REPLICAS));

}

if (indexSettingsBuilder.get(SETTING_VERSION_CREATED) == null) {

DiscoveryNodes nodes = currentState.nodes();

final Version createdVersion = Version.smallest(version, nodes.smallestNonClientNodeVersion());

indexSettingsBuilder.put(SETTING_VERSION_CREATED, createdVersion);

}

if (indexSettingsBuilder.get(SETTING_CREATION_DATE) == null) {

indexSettingsBuilder.put(SETTING_CREATION_DATE, Syshttp://tem.currentTimeMillis());

}

indexSettingsBuilder.put(SETTING_UUID, Strings.randomBase64UUID());

            //创建setting

Settings actualIndexSettings = indexSettingsBuilder.build();

            // 通过indiceservice创建索引

indicesService.createIndex(request.index(), actualIndexSettings, clusterService.localNode().id());

indexCreated = true;

//如果创建成功这里就可以获取到对应的indexservice,否则会抛出异常

IndexService indexService = indicesService.indexServiceSafe(request.index());

            //获取mappingService试图放置mapping

MapperService mapperService = indexService.mapperService();

// 为索引添加mapping,首先是默认mapping

if (mappings.containsKey(MapperService.DEFAULT_MAPPING)) {

try {

mapperService.merge(MapperService.DEFAULT_MAPPING, new CompressedString(XContentFactory.jsonBuilder().map(mappings.get(MapperService.DEFAULT_MAPPING)).string()), false);

} catch (Exception e) {

removalReason = "failed on parsing default mapping on index creation";

throw new MapperParsingException("mapping [" + MapperService.DEFAULT_MAPPING + "]", e);

}

}

for (Map.Entry<String, Map<String, Object>> entry : mappings.entrySet()) {

if (entry.getKey().equals(MapperService.DEFAULT_MAPPING)) {

continue;

}

try {

// apply the default here, its the first time we parse it

mapperService.merge(entry.getKey(), new CompressedString(XContentFactory.jsonBuilder().map(entry.getValue()).string()), true);

} catch (Exception e) {

removalReason = "failed on parsing mappings on index creation";

throw new MapperParsingException("mapping [" + entry.getKey() + "]", e);

}

}

            //添加request中的别称

IndexQueryParserService indexQueryParserService = indexService.queryParserService();

for (Alias alias : request.aliases()) {

if (Strings.hasLength(alias.filter())) {

aliasValidator.validateAliasFilter(alias.name(), alias.filter(), indexQueryParserService);

}

}

for (AliasMetaData aliasMetaData : templatesAliases.values()) {

if (aliasMetaData.filter() != null) {

aliasValidator.validateAliasFilter(aliasMetaData.alias(), aliasMetaData.filter().uncompressed(), indexQueryParserService);

}

}

// 以下更新Index的matedata,

Map<String, MappingMetaData> mappingsMetaData = Maps.newHashMap();

for (DocumentMapper mapper : mapperService.docMappers(true)) {

MappingMetaData mappingMd = new MappingMetaData(mapper);

mappingsMetaData.put(mapper.type(), mappingMd);

}

final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(request.index()).settings(actualIndexSettings);

for (MappingMetaData mappingMd : mappingsMetaData.values()) {

indexMetaDataBuilder.putMapping(mappingMd);

}

for (AliasMetaData aliasMetaData : templatesAliases.values()) {

indexMetaDataBuilder.putAlias(aliasMetaData);

}

for (Alias alias : request.aliases()) {

AliasMetaData aliasMetaData = AliasMetaData.builder(alias.name()).filter(alias.filter())

.indexRouting(alias.indexRouting()).searchRouting(alias.searchRouting()).build();

indexMetaDataBuilder.putAlias(aliasMetaData);

}

for (Map.Entry<String, Custom> customEntry : customs.entrySet()) {

indexMetaDataBuilder.putCustom(customEntry.getKey(), customEntry.getValue());

}

indexMetaDataBuilder.state(request.state());

            //matedata更新完毕,build新的matedata

final IndexMetaData indexMetaData;

try {

indexMetaData = indexMetaDataBuilder.build();

} catch (Exception e) {

removalReason = "failed to build index metadata";

throw e;

}

indexService.indicesLifecycle().beforeIndexAddedToCluster(new Index(request.index()),

indexMetaData.settings());

            //更新集群的matedata,将新build的indexmatadata加入到metadata中

MetaData newMetaData = MetaData.builder(currentState.metaData())

.put(indexMetaData, false)

.build();

logger.info("[{}] creating index, cause [{}], templates {}, shards [{}]/[{}], mappings {}", request.index(), request.cause(), templateNames, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), mappings.keySet());

            //阻塞集群,更新matadata

ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());

if (!request.blocks().isEmpty()) {

for (ClusterBlock block : request.blocks()) {

blocks.addIndexBlock(request.index(), block);

}

}

if (request.state() == State.CLOSE) {

blocks.addIndexBlock(request.index(), MetaDataIndexStateService.INDEX_CLOSED_BLOCK);

}

ClusterState updatedState = ClusterState.builder(currentState).blocks(blocks).metaData(newMetaData).build();

if (request.state() == State.OPEN) {

RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable())

.addAsNew(updatedState.metaData().index(request.index()));

RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(updatedState).routingTable(routingTableBuilder).build());

updatedState = ClusterState.builder(updatedState).routingResult(routingResult).build();

}

removalReason = "cleaning up after validating index on master";

return updatedState;

} finally {

if (indexCreated) {

// Index was already partially created - need to clean up

indicesService.removeIndex(request.index(), removalReason != null ? removalReason : "failed to create index");

}

}

}

});

}

以上就是创建index的create方法,方法中主要进行了两个动作:合并更新index的matadata和创建index。更新合并matadata的过程都在上面的代码中体现了。

从indice中获取对应的IndexService

创建索引是调用indiceSerivice构建一个guice的injector,这个injector包含了Index的所有功能(如分词,相似度等)。同时会将其存储到indiceService中,以一个map的格式存储Map> indices。运行中的集群每次对某个索引的操作都首先从indice中获取对应的IndexService。

这一部分代码如下所示:

public synchronized IndexService createIndex(String sIndexName, @IndexSettings Settings settings, String localNodeId) throws ElasticsearchException {

if (!lifecycle.started()) {

throw new ElasticsearchIllegalStateException("Can't creatIBpbEe an index [" + sIndexName + "], node is closed");

}

Index index = new Index(sIndexName);

    //检测index是否已经存在

if (indices.containsKey(index.name())) {

throw new IndexAlreadyExistsException(index);

}

indicesLifecycle.beforeIndexCreated(index, settings);

logger.debug("creating Index [{}], shards [{}]/[{}]", sIndexName, settings.get(SETTING_NUMBER_OF_SHARDS), settings.get(SETTING_NUMBER_OF_REPLICAS));

Settings indexSettings = settingsBuilder()

.put(this.settings)

.put(settings)

.classLoader(settings.getClassLoader())

.build();

    //构建index对应的injector

ModulesBuilder modules = new Modhttp://ulesBuilder();

modules.add(new IndexNameModule(index));

modules.add(new LocalNodeIdModule(localNodeId));

modules.add(new IndexSettingsModule(index, indexSettings));

modules.add(new IndexPluginsModule(indexSettings, pluginsService));

modules.add(new IndexStoreModule(indexSettings));

modules.add(new IndexEngineModule(indexSettings));

modules.add(new AnalysisModule(indexSettings, indicesAnalysisService));

modules.add(new SimilarityModule(indexSettings));

modules.add(new IndexCacheModule(indexSettings));

modules.add(new IndexFieldDataModule(indexSettings));

modules.add(new CodecModule(indexSettings));

modules.add(new MapperServiceModule());

modules.add(new IndexQueryParserModule(indexSettings));

modules.add(new IndexAliasesServiceModule());

modules.add(new IndexGatewayModule(indexSettings, injector.getInstance(Gateway.class)));

modules.add(new IndexModule(indexSettings));

Injector indexInjector;

try {

indexInjector = modules.createChildInjector(injector);

} catch (CreationException e) {

throw new IndexCreationException(index, Injectors.getFirstErrorFailure(e));

} catch (Throwable e) {

throw new IndexCreationException(index, e);

}

IndexService indexService = indexInjector.getInstance(IndexService.class);

indicesLifecycle.afterIndexCreated(indexService);

     //将Indexservice和IndexInjector加入到indice map中

indices = newMapBuilder(indices).put(index.name(), new Tuple<>(indexService, indexInjector)).immutableMap();

return indexService;

}

以上方法就是具体创建索引的过程,它是在master上操作的,同时它是同步方法。这样才能保证集群的Index创建一致性,因此这也会导致之前所说的大量创建创建索引时候的速度瓶颈。但是创建大量索引的动作是不常见的,需要尽量避免。创建一个索引对于一个集群来说就是开启对于该索引的各种操作,因此这里通过guice将索引的各个功能模块注入,并获得index操作的接口类Indexservice。如果这个方法执行成功,则可以合并template及request中的mapping,并且向刚创建的索引添加合并后的mapping,最后构建新的matadata,并将集群新的matadata发送给各个节点完成索引创建。

总结

索引创建的过程包括三步:更新集群matadata,调用indiceService中创建索引,向新创建的索引中放置(合并到Index对应的mappingService中)mapping。这三步都在以上的两个方法中。完成这三步,集群中就保存了新索引的信息,同时索引配置和mapping放置也完成。索引就可以正常使用。

以上就是elasticsearch索引创建create index集群matedata更新的详细内容,更多关于elasticsearch索引create index matedata更新的资料请关注我们其它相关文章!

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:CheckLinux脚本
下一篇:Ghidra- 软件逆向工程框架(ghidra github)
相关文章