Introduction into the Java HTTP REST client for Elasticsearch Alexander Reelsen Community Advocate alex@elastic.co | @spinscale
A presentation at Workshop in February 2020 in Munich, Germany by Alexander Reelsen
Introduction into the Java HTTP REST client for Elasticsearch Alexander Reelsen Community Advocate alex@elastic.co | @spinscale
Agenda History & Background Getting up and running Executing operations
Elastic Stack
Elasticsearch in 10 seconds Search Engine (FTS, Analytics, Geo), near real-time Distributed, scalable, highly available, resilient Interface: HTTP & JSON Heart of the Elastic Stack (Kibana, Logstash, Beats)
History TransportClient has existed since earliest versions Uses port 9300 just like the node to node communication Uses custom serialization (BWC guarantees) All the query builders available, all POJOs usable
Implementation Depends on the Elasticsearch core project Based on Apache HTTP Client (works on java 8), might want to consider shading Supports synchronous calls & cancellable async calls Threadsafe RestLowLevelClient RestHighLevelClient
Architecture
LowLevelRestClient TLS setup Basic Auth Sniffing Node Selectors
HighLevelRestClient Requests classes for all endpoints Builders for queries/aggregations
Compatibility Requires java 8 forward compatible, 7.0 with 7.x, 7.1 with 7.1 - 7.x Also upgrade the REST client when upgrading ES cluster (bugfixes, REST API breaking changes over majors) Upgrade the client last
Instantiating a client RestHighLevelClient client = new RestHighLevelClient( RestClient.builder(new HttpHost(“localhost”, 9200))); ClusterHealthResponse response = client.cluster().health(new ClusterHealthRequest(), RequestOptions.DEFAULT); ActionListener<ClusterHealthResponse> listener = ActionListener.<ClusterHealthResponse>wrap( r -> System.out.println(r.getStatus()), Throwable::printStackTrace); client.cluster() .healthAsync(new ClusterHealthRequest(), RequestOptions.DEFAULT, listener);
Basic Auth final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(“user”, “password”)); RestClientBuilder builder = RestClient.builder( new HttpHost(“localhost”, 9200)) .setHttpClientConfigCallback(new HttpClientConfigCallback() { @Override public HttpAsyncClientBuilder customizeHttpClient( HttpAsyncClientBuilder httpClientBuilder) { return httpClientBuilder .setDefaultCredentialsProvider(credentialsProvider); } });
TLS KeyStore truststore = KeyStore.getInstance(“jks”); try (InputStream is = Files.newInputStream(keyStorePath)) { truststore.load(is, keyStorePass.toCharArray()); } SSLContextBuilder sslBuilder = SSLContexts.custom() .loadTrustMaterial(truststore, null); final SSLContext sslContext = sslBuilder.build(); RestClientBuilder builder = RestClient.builder( new HttpHost(“localhost”, 9200, “https”)) .setHttpClientConfigCallback(new HttpClientConfigCallback() { @Override public HttpAsyncClientBuilder customizeHttpClient( HttpAsyncClientBuilder httpClientBuilder) { return httpClientBuilder.setSSLContext(sslContext); } });
Indexing final byte[] bytes = …; final IndexRequest request = new IndexRequest(index); // optional request.id(“my_id”); request.source(bytes, XContentType.JSON); final IndexResponse response = client.index(request, RequestOptions.DEFAULT);
Bulk indexing BulkRequest request = new BulkRequest(); request.add(new IndexRequest(“my_index”).id(“1”) .source(XContentType.JSON,”name”, “My first product”)); request.add(new IndexRequest(“my_index”).id(“2”) .source(XContentType.JSON,”name”, “My second product”)); request.add(new IndexRequest(“my_index”).id(“3”) .source(XContentType.JSON,”name”, “My third product”)); client.bulk(request, RequestOptions.DEFAULT);
Bulk Processor Bulk requests should be limited by size/number of documents Requires manual checking for constant indexing Use the built-in BulkProcessor for that.
Bulk Processor BulkProcessor bulkProcessor = BulkProcessor.builder( client, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { … } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { … } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { … } }) .setBulkActions(10000) .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) .setFlushInterval(TimeValue.timeValueSeconds(5)) .setConcurrentRequests(1) .setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) .build();
Searching final String input = “my nice query”; final SearchRequest searchRequest = new SearchRequest(); final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.multiMatchQuery(input, “name”, “description”)); searchRequest.source(searchSourceBuilder); final SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT); // no hits if (response.getHits().getTotalHits().value == 0) { return; } for (SearchHit hit : response.getHits().getHits()) { // per hit processing }
Use builders for Queries QueryBuilders.* for queries AggregationBuilders.* for aggs PipelineAggregatorBuilders.* for pipeline aggs
Pagination using from / offset private SearchRequest createSearchRequest(String input, int from, int size) { final SearchRequest searchRequest = new SearchRequest(); final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.from(from); searchSourceBuilder.size(size); searchSourceBuilder.query(QueryBuilders.multiMatchQuery(input, “name”, “description”)); searchRequest.source(searchSourceBuilder); } return searchRequest;
Pagination using search_after SearchRequest searchRequest = new SearchRequest(INDEX); searchRequest.source().query(QueryBuilders.matchQuery(“name”, “Name”)); searchRequest.source().sort(SortBuilders.fieldSort(“price”).order(SortOrder.DESC)); final SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT); SearchRequest searchAfterRequest = new SearchRequest(INDEX); searchAfterRequest.source().query(QueryBuilders.matchQuery(“name”, “Name”)); searchAfterRequest.source().sort(SortBuilders.fieldSort(“price”).order(SortOrder.DESC)); SearchHit lastHit = response.getHits().getHits()[response.getHits().getHits().length-1]; searchAfterRequest.source().searchAfter(lastHit.getSortValues()); final SearchResponse searchAfterResponse = client.search(searchAfterRequest, RequestOptions.DEFAULT);
Node Selectors (Low Level REST client) final NodeSelector INGEST_NODE_SELECTOR = nodes -> { final Iterator<Node> iterator = nodes.iterator(); while (iterator.hasNext()) { Node node = iterator.next(); // roles may be null if we don’t know, thus we keep the node in then… if (node.getRoles() != null && node.getRoles().isIngest() == false) { iterator.remove(); } } }; HttpHost host = new HttpHost(“localhost”, 9200, “http”); final RestClientBuilder builder = RestClient.builder(host); builder.setNodeSelector(INGEST_NODE_SELECTOR);
Sniffers (Low Level REST client) Fetches list of current nodes Uses a background thread (requires proper shutdown) Configurable interval Optional: Sniffing on failure Own dependency
Sniffers (Low Level REST client) RestClient restClient = RestClient.builder( new HttpHost(“localhost”, 9200, “http”)) .build(); Sniffer sniffer = Sniffer.builder(restClient).build(); // don’t forget to close both restClient.close(); sniffer.close();
Remember, remember… Consider shading of the client Modular, requires dependencies for certain features Dependencies
Deprecation warnings Error on deprecation warnings (useful when testing upgrades) final RestClientBuilder builder = RestClient.builder(host); builder.setStrictDeprecationMode(true);
Using against Elastic Cloud String cloudId = “optionalHumanReadableName:” + “dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRlbGFzdGljc2VhcmNoJGtpYmFuYQ==”; final RestClientBuilder builder = RestClient.builder(cloudId);
Debugging & Logging Apache HTTPClient uses JCL Set org.elasticsearch.client=DEBUG for more logging Set org.apache.http.wire=TRACE for whole HTTP request ^^ Privacy!
Demo Uses testcontainers Demo code available on github
Summary
Summary Think async! Sniffing is your friend! Documentation With a bit of code change, you can either start a docker container or run against another cluster!
Elastic Cloud
Elastic Support Subscriptions
Getting more help
Discuss Forum https://discuss.elastic.co
Community & Meetups https://community.elastic.co
Official Elastic Training https://training.elastic.co
Thanks for listening Q&A Alexander Reelsen Community Advocate alex@elastic.co | @spinscale