Stubs for adding throttle support.

This commit is contained in:
Richard Moore 2019-11-23 21:21:27 +09:00
parent abab9f6aa2
commit 2f0e679f0b
No known key found for this signature in database
GPG Key ID: 665176BE8E9DC651

View File

@ -16,6 +16,7 @@ export type ConnectionInfo = {
user?: string,
password?: string,
allowInsecureAuthentication?: boolean,
throttleLimit?: number,
timeout?: number,
headers?: { [key: string]: string | number }
};
@ -42,6 +43,24 @@ export type FetchJsonResponse = {
type Header = { key: string, value: string };
function getResponse(response: Response): FetchJsonResponse {
const headers: { [ header: string ]: string } = { };
if (response.headers.forEach) {
response.headers.forEach((value, key) => {
headers[key.toLowerCase()] = value;
});
} else {
(<() => Array<string>>((<any>(response.headers)).keys))().forEach((key) => {
headers[key.toLowerCase()] = response.headers.get(key);
});
}
return {
statusCode: response.status,
status: response.statusText,
headers: headers
};
}
export function fetchJson(connection: string | ConnectionInfo, json?: string, processFunc?: (value: any, response: FetchJsonResponse) => any): Promise<any> {
const headers: { [key: string]: Header } = { };
@ -60,6 +79,8 @@ export function fetchJson(connection: string | ConnectionInfo, json?: string, pr
let allow304 = false;
let timeout = 2 * 60 * 1000;
let throttle = 25;
if (options.throttleLimit) { throttle = options.throttleLimit; }
if (typeof(connection) === "string") {
url = connection;
@ -101,103 +122,105 @@ export function fetchJson(connection: string | ConnectionInfo, json?: string, pr
}
}
return new Promise(function(resolve, reject) {
if (json) {
options.method = "POST";
options.body = json;
headers["content-type"] = { key: "Content-Type", value: "application/json" };
}
const flatHeaders: { [ key: string ]: string } = { };
Object.keys(headers).forEach((key) => {
const header = headers[key];
flatHeaders[header.key] = header.value;
});
options.headers = flatHeaders;
const runningTimeout = (function() {
let timer: any = null;
if (timeout) {
timer = setTimeout(() => {
if (timer == null) { return; }
timer = null;
const promise = new Promise(function(resolve, reject) {
if (timeout) {
timer = setTimeout(() => {
if (timer == null) { return; }
timer = null;
reject(logger.makeError("timeout", Logger.errors.TIMEOUT, { timeout: timeout }));
}, timeout);
}
reject(logger.makeError("timeout", Logger.errors.TIMEOUT, { timeout: timeout }));
}, timeout);
}
});
const cancelTimeout = () => {
const cancel = function() {
if (timer == null) { return; }
clearTimeout(timer);
timer = null;
}
if (json) {
options.method = "POST";
options.body = json;
headers["content-type"] = { key: "Content-Type", value: "application/json" };
return { promise, cancel };
})();
if (throttle == 100) {
console.log(throttle);
}
const runningFetch = (async function() {
let response: Response = null;
let body: string = null;
while (true) {
try {
response = await fetch(url, options);
} catch (error) {
console.log(error);
}
body = await response.text();
if (allow304 && response.status === 304) {
// Leave body as null
break;
} else if (!response.ok) {
runningTimeout.cancel();
logger.throwError("bad response", Logger.errors.SERVER_ERROR, {
status: response.status,
body: body,
type: response.type,
url: response.url
});
} else {
break;
}
}
const flatHeaders: { [ key: string ]: string } = { };
Object.keys(headers).forEach((key) => {
const header = headers[key];
flatHeaders[header.key] = header.value;
});
options.headers = flatHeaders;
runningTimeout.cancel();
return fetch(url, options).then((response) => {
return response.text().then((body) => {
let json: any = null;
if (body != null) {
try {
json = JSON.parse(body);
} catch (error) {
logger.throwError("invalid JSON", Logger.errors.SERVER_ERROR, {
body: body,
error: error,
url: url
});
}
}
let json: any = null;
if (processFunc) {
try {
json = await processFunc(json, getResponse(response));
} catch (error) {
logger.throwError("processing response error", Logger.errors.SERVER_ERROR, {
body: json,
error: error
});
}
}
if (allow304 && response.status === 304) {
// Leave json as null
return json;
})();
} else if (!response.ok) {
logger.throwError("bad response", Logger.errors.SERVER_ERROR, {
status: response.status,
body: body,
type: response.type,
url: response.url
});
} else {
try {
json = JSON.parse(body);
} catch (error) {
logger.throwError("invalid JSON", Logger.errors.SERVER_ERROR, {
body: body,
error: error,
url: url
});
}
}
if (processFunc) {
try {
const headers: { [ header: string ]: string } = { };
if (response.headers.forEach) {
response.headers.forEach((value, key) => {
headers[key.toLowerCase()] = value;
});
} else {
(<() => Array<string>>((<any>(response.headers)).keys))().forEach((key) => {
headers[key.toLowerCase()] = response.headers.get(key);
});
}
json = processFunc(json, {
statusCode: response.status,
status: response.statusText,
headers: headers
});
} catch (error) {
logger.throwError("processing response error", Logger.errors.SERVER_ERROR, {
body: json,
error: error
});
}
}
return json;
});
}, (error) => {
throw error;
}).then((result) => {
cancelTimeout();
resolve(result);
}, (error) => {
cancelTimeout();
reject(error);
});
});
return Promise.race([ runningTimeout.promise, runningFetch ]);
}
export function poll(func: () => Promise<any>, options?: PollOptions): Promise<any> {