如何将数据从Web服务处理到MongoDB中

网友投稿 280 2023-12-05 11:17:05

如何将数据从Web服务处理到MongoDB中

本篇内容主要讲解“如何将数据从Web服务处理到MongoDB中”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“如何将数据从Web服务处理到MongoDB中”吧!

概观

如何创建一个使用Web服务数据并将其插入MongoDB数据库的Spring Batch应用程序。

要求

阅读本文的开发人员必须熟悉Spring Batch(示例)和MongoDB。

环境

Mongo数据库部署在MLab中。请按照本快速入门中的步骤操作。

批处理应用程序部署在Heroku PaaS中。详情  请看这里。

IDE STS或IntelliJ或Eclipse。

Java 8 JDK。

注意:批处理也可以在本地运行。

脚本

全局场景步骤是:

从Web服务读取数据,在这种情况下:https://sunrise-sunset.org/api

获取城市列表的坐标,然后调用API以读取日出和日落日期时间。

2.处理数据并提取业务数据

收集数据的业务处理

3.在MongoDB中插入已处理的数据

将处理过的数据保存为mongo文档

编码

输入:本地文件中JSON格式的城市数据列表,如下所示:

[   {      “名字”:“Danemark”,      “城市”:[         {            “名字”:“Copenhague”,            “lat”:55.676098,            “lng”:12.568337,            “timeZone”:“CET”         },         {            “名字”:“奥胡斯”,            “lat”:56.162939,            “lng”:10.203921,            “timeZone”:“CET”         },         {            “名字”:“欧登塞”,            “lat”:55.39594,            “lng”:10.38831,            “timeZone”:“CET”         },         {            “名字”:“奥尔堡”,            “lat”:57.046707,            “lng”:9.935932,            “timeZone”:“CET”         }      ]   }]

我们的场景从本地json文件获取输入数据。映射bean如下:

国豆:

导入 java。io。可序列化 ;导入 java。util。清单 ;进口 com。fastxml。杰克逊。注释。JsonIgnoreProperties ;@JsonIgnoreProperties(ignoreUnknown  =truepublic  class  BCountry  实现 Serializable {private  static  final  longserialVersionUID  =1L ;私有 字符串 名称 ;私人 名单BCity >  城市 ;public  BCountry(){super();}public  BCountry(Stringname,List < BCity >  cities){super();这个。name  =  name ;这个。城市 =  城市 ;}public  BCountry(String  name){super();这个。name  =  name ;}public  String  getName(){返回 名称 ;}public  void  setName(String  name){这个。name  =  name ;}public  List< BCity >  getCities(){返回 城市 ;}public  void  setCities(List < BCity >  cities){这个。城市 =  城市 ;}@覆盖public  inthashCode(){final  int  prime  =  31 ;int  result  =  1 ;结果 =  黄金 *  结果 +((城市 ==  空)? 0:城市。的hashCode());结果 =  黄金 *  结果 +((名称 ==  空)? 0:名称。的hashCode());返回 结果 ;}@覆盖public  boolean  equals(Object  obj){ifthis  ==  obj)返回 true ;if(obj  ==  null返回 虚假 ;如果(的getClass()!=  OBJ。的getClass())返回 虚假 ;BCountry  other  =(BCountry)obj ;if(cities  ==  null){如果(其他。城市 !=  空)返回 虚假 ;} 否则 如果(!城市。平等(等。城市))返回 虚假 ;if(name  ==  null){如果(其他。名字 !=  空)返回 虚假 ;} 否则 如果(!名字。平等(其它。名))返回 虚假 ;返回 true ;}@覆盖public  String  toString(){返回 “BCountry [name =”  +  name  +  “,cities =”  +  cities  +  “]” ;}}

和城市豆:

导入 java。io。可序列化 ;进口 com。fastxml。杰克逊。注释。JsonIgnoreProperties ;@JsonIgnoreProperties(ignoreUnknown  =  true)公共 类 BCity  实现 Serializable {private  static  final  long  serialVersionUID  =  1L ;private  Stringname,timeZone ;私人 双 拉特,lng ;public  BCity(){super();}public  BCity(String  name,String  timeZone,double  lat,doublelng){super();这个。name  =  name ;这个。timeZone  =  timeZone ;这个。lat  =  lat ;这个。lng  =  lng ;}public  String  getName(){返回 名称 ;}public  void  setName(String  name){这个。name  =  name ;}public  StringgetTimeZone(){返回时 区 ;}public  void  setTimeZone(String  timeZone){这个。timeZone  =  timeZone ;}public  double  getLat(){返回 纬度 ;}public  void  setLat(double  lat){这个。lat  =  lat ;}public  double  getLng(){返回 lng ;}public  void  setLng(doublelng){这个。lng  =  lng ;}@覆盖public  String  toString(){返回 “BCity [name =”  +  name  +  “,timeZone =”  +  timeZone  +  “,lat =”  +  lat  +  “,lng =”  +  lng  +  “]” ;}@覆盖public  int  hashCode(){final  int  prime  =  31 ;int  result  =  1 ;长 温度 ;temp  =  Double。doubleToLongBits(lat);result  =  prime  *  result  +(int)(temp  ^(temp  >>>  32));temp  =  Double。doubleToLongBits(lng);result  =  prime  *  result  +(int)(temp  ^(temp  >>>  32));结果 =  黄金 *  结果 +((名称 ==  空)? 0:名称。的hashCode());结果 =   *  结果 +((的timeZone  ==  空)? 0:的timeZone。的hashCode());返回 结果 ;}@覆盖public  boolean  equals(Object  obj){ifthis  ==  obj)返回 true ;if(obj  ==  null返回 虚假 ;如果(的getClass()!=  OBJ。的getClass())返回 虚假 ;BCityother  =(BCity)obj ;如果(双。doubleToLongBits的(LAT)!=  双。doubleToLongBits的(其他的。LAT))返回 虚假 ;如果(双。doubleToLongBits的(LNG)!=  双。doubleToLongBits的(其他的。LNG))返回 虚假 ;if(name  ==  null){如果(其他。名字 !=  空)返回 虚假 ;} 否则 如果(!名字。平等(其它。名))返回 虚假 ;if(timeZone  ==  null){如果(其他。的timeZone  !=  空)返回 虚假 ;} 否则 如果(!的timeZone。平等(其它。的timeZone))返回 虚假 ;返回 true ;}}

批量阅读器实现@LineMapper。您可以使读者适应我们的数据源(示例):

导入 java。util。清单 ;进口 组织。弹簧框架。批次。项目。档案。LineMapper ;进口 com。ahajri。批次。豆子。BCountry ;进口 com。fastxml。杰克逊。数据绑定。ObjectMapper ;进口 com。fastxml。杰克逊。数据绑定。类型。CollectionType ;公共 类 BCountryJsonLineMapper  实现了 LineMapper <List < BCountry >> {private  final  ObjectMapper  mapper  =  new  ObjectMapper();@覆盖public  List < BCountry >  mapLine(String  line,intlineNumber)throws  Exception {CollectionTypecollectionType  = mapper。getTypeFactory()。constructCollectionType(列表。类,BCountry。类);返回 映射器。readValue(line,collectionType);}}

处理数据批处理时,检查同一天某个城市的业务处理数据是否已保存在数据库中。在MongoDB的搜索数据的方式就是在这个详细的岗位。

ItemProcessor将@BCountry对象转换为MongoDB Document对象。该过程详述如下:

public  class  BCountryPrayTimeEventItemProcessor  实现 ItemProcessor < List < BCountry >,List < Document >> {private  static  final  String  EVENTS_COLLECTION_NAME  =  “event” ;private  static  final  Logger  LOG  =  LoggerFactory。getLogger(BCountryPrayTimeEventItemProcessor。类);@Autowiredprivate  PrayTimeService  prayTimeService ;@Autowiredprivate  CloudMongoService  cloudMongoService ;@覆盖public  List < Document >  进程(List < BCountry >  items)抛出 Exception {final  List < Document >  docs  =  new  ArrayList <>();物品。stream()。forEach(item  - > {final  StringcountryName  =  item。getName();项目。getCities()。stream()。forEach(c  - > {final  Document  prayTimeCityEventDoc  =  newDocument();//循环城市并为今天提取祈祷时间final  String  cityName  =  c。getName();final  String  cityTimeZone  =  c。getTimeZone();final  double  lat  =  c。getLat();final  double  lng  =  c。getLng();final  LocalDateTime  nowOfCity  =  LocalDateTime。现在(了zoneid。的(cityTimeZone));final  QueryParam [] queryParams  =  new  QueryParam [ 5 ];queryParams [ 0 ] =   QueryParam(“CITY_NAME” ,OperatorEnum。EQ。名称(),的cityName);queryParams [ 1 ] =   QueryParam(“EVENT_TYPE” ,OperatorEnum。EQ。名称(),事件类型。PRAY_TIME。名称());queryParams [ 2 ] =   QueryParam(“月”,OperatorEnum。EQ。名称(),nowOfCity。getMonthValue());queryParams [ 3 ] =   QueryParam(“DAY_OF_MONTH” ,OperatorEnum。EQ。名称(),nowOfCity。getDayOfMonth());queryParams [ 4 ] =   QueryParam(“COUNTRY_NAME” ,OperatorEnum。EQ。名称(),国家名称);List < Document >  foundEvents  =  null ;尝试 {foundEvents  =  cloudMongoService。搜索(EVENTS_COLLECTION_NAME,queryParams);catchBusinessException  e1){记录。错误(“====>未找到城市祈祷时间”  +  的cityName  +  “对”  +  nowOfCity。getDayOfMonth()+  “/”+  nowOfCity。getMonthValue());}尝试 {如果(CollectionUtils。的isEmpty(foundEvents)){//祈祷时间尚未创建prayTimeCityEventDoc。put(“country_name”,countryName);prayTimeCityEventDoc。put(“city_name”,cityName);prayTimeCityEventDoc。把(“EVENT_TYPE” ,事件类型。PRAY_TIME。名称());prayTimeCityEventDoc。把(“复发”,RecurringEnum。YEARLY。名称());prayTimeCityEventDoc。把(“月”,nowOfCity。getMonthValue());prayTimeCityEventDoc。把(“DAY_OF_MONTH” ,nowOfCity。getDayOfMonth());prayTimeCityEventDoc。put(“lat”,lat);prayTimeCityEventDoc。put(“lng”,lng);prayTimeCityEventDoc。把(“CREATION_DATE” ,HCDateUtils。convertToDateViaSqlTimestamp(nowOfCity));finalMap < StringObject>  prayInfos  =  prayTimeService。getPrayTimeByLatLngDate(lat,lng,日期。从(nowOfCity。atZone(了zoneid。的(cityTimeZone))。toInstant()),cityTimeZone);prayTimeCityEventDoc。把(“pray_infos” ,文件。解析(新 GSON()的toJSON(prayInfos)));docs。add(prayTimeCityEventDoc);} else {记录。信息(字符串。格式(“====>祈祷的时间已经存在的城市:%S,月:%d,日:%d” ,cityName,nowOfCity。getMonthValue(),nowOfCity。getDayOfMonth()));}}catchBusinessExceptione){记录。错误(“计算祈祷时间时出问题:”,e);抛出 新的 RuntimeException(e);}});});返回 文档 ;}}

批量配置类:

@组态@EnableBatchProcessing@EnableScheduling公共 类 BatchConfiguration {private  static  final  String  SCANDINAVIAN_COUNTRIES_JSON_FILE=  “scandinavian-countries.json” ;private  static  final  String  EVENT_COLLECTION_NAME  =  “event_collection” ;private  static  final  Logger  LOG  =  LoggerFactory。getLogger(BatchConfiguration。类);@Autowiredprivate  JobBuilderFactoryjobBuilderFactory ;@Autowiredprivate  StepBuilderFactory  stepBuilderFactory ;@Autowired私有的 MLabMongoService  mlabMongoService ;@豆public  ResourcelessTransactionManagertransactionManager(){返回 新的 ResourcelessTransactionManager();}@豆public  MapJobRepositoryFactoryBean  mapJobRepositoryFactory(ResourcelessTransactionManagertxManager)抛出 异常 {MapJobRepositoryFactoryBean  factory  =newMapJobRepositoryFactoryBean(txManager);工厂。afterPropertiesSet();返回 工厂 ;}@豆public  JobRepository  jobRepository(MapJobRepositoryFactoryBeanfactory)抛出 异常 {return(JobRepository)工厂。getObject();}private  SimpleJobLauncher  jobLauncher ;@豆public  SimpleJobLauncher  jobLauncher(JobRepository  jobRepository){jobLauncher。setJobRepository(jobRepository);returnjobLauncher ;}@PostConstructprivate  void  initJobLauncher(){jobLauncher  =  new  SimpleJobLauncher();}@豆FlatFileItemReader < List < BCountry >>  reader(){FlatFileItemReader < List < BCountry >>  reader  =  newFlatFileItemReader <>();读者。setName(“scandinaviandCountriesReader”);读者。setResource(newClassPathResource(SCANDINAVIAN_COUNTRIES_JSON_FILE));读者。setLineMapper(new  BCountryJsonLineMapper());回报 读者 ;}@豆publicItemWriter <List < Document >>  writer(){返回 新的 ItemWriter < List < Document >>(){@覆盖public  void  write(List<? extendsList < Document >>  items)抛出 Exception {尝试 {如果(!CollectionUtils。的isEmpty(项目)&&  项目。大小()>  0){List< Document >  flatDocs  =  items。stream()。flatMap(List:: stream)。收集(收藏家。toList());mlabMongoService。insertMany(EVENT_COLLECTION_NAME,flatDocs);}else {记录。警告(“没有事件可以救......”);}} catchBusinessExceptione){抛出 新的 RuntimeException(e);}}};}@豆public  BCountryTimeEventItemProcessorprocessor(){返回 新的 BCountryTimeEventItemProcessor();}@豆public  Job  scandvTimeJob(){返回 jobBuilderFactory。get(“scandvTimeJob”)。incrementmenter(new  RunIdIncrementer())。流程(step1())。结束()。build();}@豆public  Stepstep1(){返回 stepBuilderFactory。得到(“step1”)。<List < BCountry >,List < Document >> chunk(10)。读者(读者())。处理器(处理器())。作家(作家())。build();}// end :: jobstep []  //每天午夜15分钟@Scheduled(cron  =  “0 15 0 * * *”)public  void  startScandvEventTimeJob()throwsException {记录。info(“====>工作开始时间:”  +  新 日期());JobParameters  param  =  newJobParametersBuilder()。addString(“作业ID” ,字符串。的valueOf(系统。的currentTimeMillis()))。toJobParameters();JobExecution执行 =  jobLauncher。run(scandvPrayTimeJob(),param);记录。信息(“====>工作完成了状态:”  +  执行。的getStatus());}}

部署de Batch到Heroku:

git add。gitcommit -m  “Deploy Batch”gitpush heroku master

注意:要禁用默认批量启动,请将此添加到application.yml

s p r i n g:  b a t c h:    jo b:      Ë Ñ 一个b 升é d:˚F 一升小号ë

到此,相信大家对“如何将数据从Web服务处理到MongoDB中”有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Redis中的布隆过滤器怎么实现
下一篇:redis层级结构是怎样的
相关文章