-
-
Save parthdesai93/1bd3a25ad4cf788d49ce4a00a1bb3268 to your computer and use it in GitHub Desktop.
/* requires AWS creds to be updated. | |
* if they aren't, update using AWS.config.update() method before instatiing the client. | |
* | |
* import this module where you instantiate the client, and simply pass this module as the connection class. | |
* | |
* eg: | |
* const client = new Client({ | |
* node, | |
* Connection: AwsConnector | |
* }); | |
*/ | |
import AWS from 'aws-sdk'; | |
import { Connection } from '@elastic/elasticsearch'; | |
class AwsConnector extends Connection { | |
async request(params, callback) { | |
try { | |
const creds = await this.getAWSCredentials(); | |
const req = this.createRequest(params); | |
const { request: signedRequest } = this.signRequest(req, creds); | |
super.request(signedRequest, callback); | |
} catch (error) { | |
throw error; | |
} | |
} | |
createRequest(params) { | |
const endpoint = new AWS.Endpoint(this.url.href); | |
let req = new AWS.HttpRequest(endpoint); | |
Object.assign(req, params); | |
req.region = AWS.config.region; | |
if (!req.headers) { | |
req.headers = {}; | |
} | |
let body = params.body; | |
if (body) { | |
let contentLength = Buffer.isBuffer(body) | |
? body.length | |
: Buffer.byteLength(body); | |
req.headers['Content-Length'] = contentLength; | |
req.body = body; | |
} | |
req.headers['Host'] = endpoint.host; | |
return req; | |
} | |
getAWSCredentials() { | |
return new Promise((resolve, reject) => { | |
AWS.config.getCredentials((err, creds) => { | |
if (err) { | |
if (err && err.message) { | |
err.message = `AWS Credentials error: ${e.message}`; | |
} | |
reject(err); | |
} | |
resolve(creds); | |
}); | |
}); | |
} | |
signRequest(request, creds) { | |
const signer = new AWS.Signers.V4(request, 'es'); | |
signer.addAuthorization(creds, new Date()); | |
return signer; | |
} | |
} | |
export { AwsConnector }; |
import { AwsConnector } from './aws_es_connector'; | |
import { Client } from '@elastic/elasticsearch'; | |
import AWS from 'aws-sdk'; | |
// load aws keys and region, ideally using env variables. | |
let accessKey = '****'; | |
let secretKey = '****'; | |
let region = '**'; //eg: us-east-1 | |
let node = '***'; //node url | |
AWS.config.update({ | |
credentials: new AWS.Credentials(accessKey, secretKey), | |
region | |
}); | |
const client = new Client({ | |
node, | |
Connection: AwsConnector | |
}); | |
//use this client to talk to AWS managed ES service. | |
export default client; |
@bibobibo For TypeScript version, which version of AWS-SDK is compatible? Main issue is with request
method. If using version 2.506.0
the method signature requires us to return an http.ClientRequest
object - but your code does not return anything thus the error. Also the first argument's type RequestOptions
is not exported out of the type definition.
ESlint is throwing errors for this.
Can this be solved? Or perhaps I should use JS version of the code...
So far the code looks like below:
import AWS from 'aws-sdk';
import { Credentials, CredentialsOptions } from 'aws-sdk/lib/credentials';
import AwsV4Signer from 'aws-sdk/lib/signers/v4';
import { Connection } from 'es6';
import * as http from 'http';
interface RequestOptions extends http.ClientRequestArgs {
asStream?: boolean;
}
export default class AwsConnector extends Connection {
private static REGION = 'ap-south-1';
public request(options: RequestOptions, callback: (err: Error | null, response: http.IncomingMessage | null) => void): http.ClientRequest {
const httpRequest = this.mergeAwsHttpRequest(options);
AwsConnector.getAWSCredentials().then((credentials) => {
const { request: signedRequest } = AwsConnector.signRequest(httpRequest, credentials);
return super.request(signedRequest, callback);
});
return httpRequest;
}
private mergeAwsHttpRequest(options: any) {
const endpoint = new AWS.Endpoint(this.url.href);
const httpRequest = new AWS.HttpRequest(endpoint, AWS.config.region || AwsConnector.REGION);
Object.assign(httpRequest, options);
if (!httpRequest.headers) {
httpRequest.headers = {};
}
const { body } = options;
if (body) {
const contentLength = Buffer.isBuffer(body) ? body.length : Buffer.byteLength(body);
httpRequest.headers['Content-Length'] = `${contentLength}`;
httpRequest.body = body;
}
httpRequest.headers.Host = endpoint.host;
// @ts-ignore
httpRequest.path = `/?${httpRequest.querystring}`;
// @ts-ignore
delete httpRequest.querystring;
return httpRequest;
}
private static getAWSCredentials(): Promise<Credentials | CredentialsOptions | null> {
return new Promise((resolve, reject) => {
AWS.config.getCredentials((error) => {
if (error) {
return reject(error);
}
return resolve(AWS.config.credentials);
});
});
}
private static signRequest(request: AWS.HttpRequest, credentials: Credentials | CredentialsOptions | null) {
const signer = new AwsV4Signer(request, 'es');
signer.addAuthorization(credentials, new Date());
return signer;
}
}
This is my TypeScript version:
import { Connection as UnsignedConnection } from '@elastic/elasticsearch';
import * as AWS from 'aws-sdk';
import RequestSigner from 'aws-sdk/lib/signers/v4';
import { ClientRequest, RequestOptions, IncomingMessage } from 'http';
class AwsElasticsearchError extends Error {}
class AwsSignedConnection extends UnsignedConnection {
public request(
params: RequestOptions,
callback: (err: Error | null, response: IncomingMessage | null) => void,
): ClientRequest {
const signedParams = this.signParams(params);
return super.request(signedParams, callback);
}
// TODO: better type after https://github.com/elastic/elasticsearch-js/issues/951
private signParams(params: any): RequestOptions {
const region = AWS.config.region || process.env.AWS_DEFAULT_REGION;
if (!region) {
throw new AwsElasticsearchError('missing region configuration');
}
const endpoint = new AWS.Endpoint(this.url.href);
const request = new AWS.HttpRequest(endpoint, region);
request.method = params.method;
request.path = params.querystring
? `${params.path}/?${params.querystring}`
: params.path;
request.body = params.body;
request.headers = params.headers;
request.headers.Host = endpoint.host;
const signer = new RequestSigner(request, 'es');
signer.addAuthorization(AWS.config.credentials, new Date());
return request;
}
}
export { AwsSignedConnection, UnsignedConnection, AwsElasticsearchError };
We've released an NPM package that works with @elastic/eleasticsearch and AWS elasticsearch clusters. It signs the requests for AWS and refreshes the credentials when they're about to expire.
https://www.npmjs.com/package/@acuris/aws-es-connection
https://github.com/mergermarket/acuris-aws-es-connection
@fewstera Thank you for posting this. Will test it out.
@fewstera A question. You have a list of whitelisted function names that you wrap in awsCredsifyAll. How would you keep them always in sync with any new ES release?
@mohitkhanna those whitelisted keys come from the first level of this object: https://github.com/elastic/elasticsearch-js/blob/master/api/index.js#L18. So they could easily be updated using a script, but there is no automatic setup in place to currently do this.
I don't see them changing often, but I will check them occasionally to ensure they don't go out of sync. If a whitelisted key is missed, the client will continue to work, just the AWS credentials will not be refreshed by that method call.
Figured it out.
AWSHTTPRequest uses path to send query parameters and there is no key as "querystring" here in docs.
So a small change in aws_es_connector.js solves the problem.
In createRequest() before sending the req object
req.path+='/?'+req.querystring; delete req.querystring;
Now all parameters will work fine.
cool .thanks saved my day
Thank you @parthdesai93! Was very helpful
Hey. I find I spot an issue.
Look at this:
`
> w.byteLength 8 `const x = 'ߤߤߤߤ';
x.length
4
w = Buffer.from(x)
Content-Length is provided in bytes. So the header is going to have invalid value - 4 bytes instead of 8.
Line #59 has an issue:
should be
e
is not defined in that context