VICE / ModuleTests / Azure / node_modules / @azure / data-tables / dist-esm / src / TableTransaction.js
TableTransaction.js
Raw
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import { isNamedKeyCredential, isSASCredential, isTokenCredential, } from "@azure/core-auth";
import { ServiceClient, serializationPolicy, serializationPolicyName, } from "@azure/core-client";
import { RestError, createHttpHeaders, createPipelineRequest, } from "@azure/core-rest-pipeline";
import { getInitialTransactionBody, getTransactionHttpRequestBody, } from "./utils/transactionHelpers";
import { transactionHeaderFilterPolicy, transactionHeaderFilterPolicyName, transactionRequestAssemblePolicy, transactionRequestAssemblePolicyName, } from "./TablePolicies";
import { SpanStatusCode } from "@azure/core-tracing";
import { cosmosPatchPolicy } from "./cosmosPathPolicy";
import { createSpan } from "./utils/tracing";
import { getAuthorizationHeader } from "./tablesNamedCredentialPolicy";
import { getTransactionHeaders } from "./utils/transactionHeaders";
import { isCosmosEndpoint } from "./utils/isCosmosEndpoint";
import { signURLWithSAS } from "./tablesSASTokenPolicy";
import { STORAGE_SCOPE } from "./utils/constants";
/**
 * Helper to build a list of transaction actions
 */
export class TableTransaction {
    constructor(actions) {
        this.actions = actions !== null && actions !== void 0 ? actions : [];
    }
    /**
     * Adds a create action to the transaction
     * @param entity - entity to create
     */
    createEntity(entity) {
        this.actions.push(["create", entity]);
    }
    /**
     * Adds a delete action to the transaction
     * @param partitionKey - partition key of the entity to delete
     * @param rowKey - rowKey of the entity to delete
     */
    deleteEntity(partitionKey, rowKey) {
        this.actions.push(["delete", { partitionKey, rowKey }]);
    }
    /**
     * Adds an update action to the transaction
     * @param entity - entity to update
     * @param updateMode - update mode
     */
    updateEntity(entity, updateMode = "Merge") {
        this.actions.push(["update", entity, updateMode]);
    }
    /**
     * Adds an upsert action to the transaction, which inserts if the entity doesn't exist or updates the existing one
     * @param entity - entity to upsert
     * @param updateMode - update mode
     */
    upsertEntity(entity, updateMode = "Merge") {
        this.actions.push(["upsert", entity, updateMode]);
    }
}
/**
 * TableTransaction collects sub-operations that can be submitted together via submitTransaction
 */
export class InternalTableTransaction {
    /**
     * @param url - Tables account url
     * @param partitionKey - partition key
     * @param credential - credential to authenticate the transaction request
     */
    constructor(url, partitionKey, transactionId, changesetId, clientOptions, interceptClient, credential, allowInsecureConnection = false) {
        this.clientOptions = clientOptions;
        this.credential = credential;
        this.url = url;
        this.interceptClient = interceptClient;
        this.allowInsecureConnection = allowInsecureConnection;
        // Initialize Reset-able properties
        this.resetableState = this.initializeSharedState(transactionId, changesetId, partitionKey);
        // Depending on the auth method used we need to build the url
        if (!credential) {
            // When the SAS token is provided as part of the URL we need to move it after $batch
            const urlParts = url.split("?");
            this.url = urlParts[0];
            const sas = urlParts.length > 1 ? `?${urlParts[1]}` : "";
            this.url = `${this.getUrlWithSlash()}$batch${sas}`;
        }
        else {
            // When using a SharedKey credential no SAS token is needed
            this.url = `${this.getUrlWithSlash()}$batch`;
        }
    }
    /**
     * Resets the state of the Transaction.
     */
    reset(transactionId, changesetId, partitionKey) {
        this.resetableState = this.initializeSharedState(transactionId, changesetId, partitionKey);
    }
    initializeSharedState(transactionId, changesetId, partitionKey) {
        const pendingOperations = [];
        const bodyParts = getInitialTransactionBody(transactionId, changesetId);
        const isCosmos = isCosmosEndpoint(this.url);
        prepateTransactionPipeline(this.interceptClient.pipeline, bodyParts, changesetId, isCosmos);
        return {
            transactionId,
            changesetId,
            partitionKey,
            pendingOperations,
            bodyParts,
        };
    }
    /**
     * Adds a createEntity operation to the transaction
     * @param entity - Entity to create
     */
    createEntity(entity) {
        this.checkPartitionKey(entity.partitionKey);
        this.resetableState.pendingOperations.push(this.interceptClient.createEntity(entity));
    }
    /**
     * Adds a createEntity operation to the transaction per each entity in the entities array
     * @param entities - Array of entities to create
     */
    createEntities(entities) {
        for (const entity of entities) {
            this.checkPartitionKey(entity.partitionKey);
            this.resetableState.pendingOperations.push(this.interceptClient.createEntity(entity));
        }
    }
    /**
     * Adds a deleteEntity operation to the transaction
     * @param partitionKey - Partition key of the entity to delete
     * @param rowKey - Row key of the entity to delete
     * @param options - Options for the delete operation
     */
    deleteEntity(partitionKey, rowKey, options) {
        this.checkPartitionKey(partitionKey);
        this.resetableState.pendingOperations.push(this.interceptClient.deleteEntity(partitionKey, rowKey, options));
    }
    /**
     * Adds an updateEntity operation to the transaction
     * @param entity - Entity to update
     * @param mode - Update mode (Merge or Replace)
     * @param options - Options for the update operation
     */
    updateEntity(entity, mode, options) {
        this.checkPartitionKey(entity.partitionKey);
        this.resetableState.pendingOperations.push(this.interceptClient.updateEntity(entity, mode, options));
    }
    /**
     * Adds an upsertEntity operation to the transaction
     * @param entity - The properties for the table entity.
     * @param mode   - The different modes for updating the entity:
     *               - Merge: Updates an entity by updating the entity's properties without replacing the existing entity.
     *               - Replace: Updates an existing entity by replacing the entire entity.
     * @param options - The options parameters.
     */
    upsertEntity(entity, mode, options) {
        this.checkPartitionKey(entity.partitionKey);
        this.resetableState.pendingOperations.push(this.interceptClient.upsertEntity(entity, mode, options));
    }
    /**
     * Submits the operations in the transaction
     */
    async submitTransaction() {
        await Promise.all(this.resetableState.pendingOperations);
        const body = getTransactionHttpRequestBody(this.resetableState.bodyParts, this.resetableState.transactionId, this.resetableState.changesetId);
        const options = this.clientOptions;
        if (isTokenCredential(this.credential)) {
            options.credentialScopes = STORAGE_SCOPE;
            options.credential = this.credential;
        }
        const client = new ServiceClient(options);
        const headers = getTransactionHeaders(this.resetableState.transactionId);
        const { span, updatedOptions } = createSpan("TableTransaction-submitTransaction", {});
        const request = createPipelineRequest({
            url: this.url,
            method: "POST",
            body,
            headers: createHttpHeaders(headers),
            tracingOptions: updatedOptions.tracingOptions,
            allowInsecureConnection: this.allowInsecureConnection,
        });
        if (isNamedKeyCredential(this.credential)) {
            const authHeader = getAuthorizationHeader(request, this.credential);
            request.headers.set("Authorization", authHeader);
        }
        else if (isSASCredential(this.credential)) {
            signURLWithSAS(request, this.credential);
        }
        try {
            const rawTransactionResponse = await client.sendRequest(request);
            return parseTransactionResponse(rawTransactionResponse);
        }
        catch (error) {
            span.setStatus({
                code: SpanStatusCode.ERROR,
                message: error.message,
            });
            throw error;
        }
        finally {
            span.end();
        }
    }
    checkPartitionKey(partitionKey) {
        if (this.resetableState.partitionKey !== partitionKey) {
            throw new Error("All operations in a transaction must target the same partitionKey");
        }
    }
    getUrlWithSlash() {
        return this.url.endsWith("/") ? this.url : `${this.url}/`;
    }
}
export function parseTransactionResponse(transactionResponse) {
    const subResponsePrefix = `--changesetresponse_`;
    const status = transactionResponse.status;
    const rawBody = transactionResponse.bodyAsText || "";
    const splitBody = rawBody.split(subResponsePrefix);
    const isSuccessByStatus = 200 <= status && status < 300;
    if (!isSuccessByStatus) {
        handleBodyError(rawBody, status, transactionResponse.request, transactionResponse);
    }
    // Dropping the first and last elements as they are the boundaries
    // we just care about sub request content
    const subResponses = splitBody.slice(1, splitBody.length - 1);
    const responses = subResponses.map((subResponse) => {
        const statusMatch = subResponse.match(/HTTP\/1.1 ([0-9]*)/);
        if ((statusMatch === null || statusMatch === void 0 ? void 0 : statusMatch.length) !== 2) {
            throw new Error(`Couldn't extract status from sub-response:\n ${subResponse}`);
        }
        const subResponseStatus = Number.parseInt(statusMatch[1]);
        if (!Number.isInteger(subResponseStatus)) {
            throw new Error(`Expected sub-response status to be an integer ${subResponseStatus}`);
        }
        const bodyMatch = subResponse.match(/\{(.*)\}/);
        if ((bodyMatch === null || bodyMatch === void 0 ? void 0 : bodyMatch.length) === 2) {
            handleBodyError(bodyMatch[0], subResponseStatus, transactionResponse.request, transactionResponse);
        }
        const etagMatch = subResponse.match(/ETag: (.*)/);
        const rowKeyMatch = subResponse.match(/RowKey='(.*)'/);
        return Object.assign(Object.assign({ status: subResponseStatus }, ((rowKeyMatch === null || rowKeyMatch === void 0 ? void 0 : rowKeyMatch.length) === 2 && { rowKey: rowKeyMatch[1] })), ((etagMatch === null || etagMatch === void 0 ? void 0 : etagMatch.length) === 2 && { etag: etagMatch[1] }));
    });
    return {
        status,
        subResponses: responses,
        getResponseForEntity: (rowKey) => responses.find((r) => r.rowKey === rowKey),
    };
}
function handleBodyError(bodyAsText, statusCode, request, response) {
    var _a, _b;
    let parsedError;
    try {
        parsedError = JSON.parse(bodyAsText);
    }
    catch (_c) {
        parsedError = {};
    }
    let message = "Transaction Failed";
    let code;
    // Only transaction sub-responses return body
    if (parsedError && parsedError["odata.error"]) {
        const error = parsedError["odata.error"];
        message = (_b = (_a = error.message) === null || _a === void 0 ? void 0 : _a.value) !== null && _b !== void 0 ? _b : message;
        code = error.code;
    }
    throw new RestError(message, {
        code,
        statusCode,
        request,
        response,
    });
}
/**
 * Prepares the transaction pipeline to intercept operations
 * @param pipeline - Client pipeline
 */
export function prepateTransactionPipeline(pipeline, bodyParts, changesetId, isCosmos) {
    // Fist, we need to clear all the existing policies to make sure we start
    // with a fresh state.
    const policies = pipeline.getOrderedPolicies();
    for (const policy of policies) {
        pipeline.removePolicy({
            name: policy.name,
        });
    }
    // With the clear state we now initialize the pipelines required for intercepting the requests.
    // Use transaction assemble policy to assemble request and intercept request from going to wire
    pipeline.addPolicy(serializationPolicy(), { phase: "Serialize" });
    pipeline.addPolicy(transactionHeaderFilterPolicy());
    pipeline.addPolicy(transactionRequestAssemblePolicy(bodyParts, changesetId));
    if (isCosmos) {
        pipeline.addPolicy(cosmosPatchPolicy(), {
            afterPolicies: [transactionHeaderFilterPolicyName],
            beforePolicies: [serializationPolicyName, transactionRequestAssemblePolicyName],
        });
    }
}
//# sourceMappingURL=TableTransaction.js.map