// Copyright (c) Microsoft Corporation. // Licensed under the MIT license. import { isNamedKeyCredential, isSASCredential, isTokenCredential, } from "@azure/core-auth"; import { ServiceClient, serializationPolicy, serializationPolicyName, } from "@azure/core-client"; import { RestError, createHttpHeaders, createPipelineRequest, } from "@azure/core-rest-pipeline"; import { getInitialTransactionBody, getTransactionHttpRequestBody, } from "./utils/transactionHelpers"; import { transactionHeaderFilterPolicy, transactionHeaderFilterPolicyName, transactionRequestAssemblePolicy, transactionRequestAssemblePolicyName, } from "./TablePolicies"; import { SpanStatusCode } from "@azure/core-tracing"; import { cosmosPatchPolicy } from "./cosmosPathPolicy"; import { createSpan } from "./utils/tracing"; import { getAuthorizationHeader } from "./tablesNamedCredentialPolicy"; import { getTransactionHeaders } from "./utils/transactionHeaders"; import { isCosmosEndpoint } from "./utils/isCosmosEndpoint"; import { signURLWithSAS } from "./tablesSASTokenPolicy"; import { STORAGE_SCOPE } from "./utils/constants"; /** * Helper to build a list of transaction actions */ export class TableTransaction { constructor(actions) { this.actions = actions !== null && actions !== void 0 ? actions : []; } /** * Adds a create action to the transaction * @param entity - entity to create */ createEntity(entity) { this.actions.push(["create", entity]); } /** * Adds a delete action to the transaction * @param partitionKey - partition key of the entity to delete * @param rowKey - rowKey of the entity to delete */ deleteEntity(partitionKey, rowKey) { this.actions.push(["delete", { partitionKey, rowKey }]); } /** * Adds an update action to the transaction * @param entity - entity to update * @param updateMode - update mode */ updateEntity(entity, updateMode = "Merge") { this.actions.push(["update", entity, updateMode]); } /** * Adds an upsert action to the transaction, which inserts if the entity doesn't exist or updates the existing one * @param entity - entity to upsert * @param updateMode - update mode */ upsertEntity(entity, updateMode = "Merge") { this.actions.push(["upsert", entity, updateMode]); } } /** * TableTransaction collects sub-operations that can be submitted together via submitTransaction */ export class InternalTableTransaction { /** * @param url - Tables account url * @param partitionKey - partition key * @param credential - credential to authenticate the transaction request */ constructor(url, partitionKey, transactionId, changesetId, clientOptions, interceptClient, credential, allowInsecureConnection = false) { this.clientOptions = clientOptions; this.credential = credential; this.url = url; this.interceptClient = interceptClient; this.allowInsecureConnection = allowInsecureConnection; // Initialize Reset-able properties this.resetableState = this.initializeSharedState(transactionId, changesetId, partitionKey); // Depending on the auth method used we need to build the url if (!credential) { // When the SAS token is provided as part of the URL we need to move it after $batch const urlParts = url.split("?"); this.url = urlParts[0]; const sas = urlParts.length > 1 ? `?${urlParts[1]}` : ""; this.url = `${this.getUrlWithSlash()}$batch${sas}`; } else { // When using a SharedKey credential no SAS token is needed this.url = `${this.getUrlWithSlash()}$batch`; } } /** * Resets the state of the Transaction. */ reset(transactionId, changesetId, partitionKey) { this.resetableState = this.initializeSharedState(transactionId, changesetId, partitionKey); } initializeSharedState(transactionId, changesetId, partitionKey) { const pendingOperations = []; const bodyParts = getInitialTransactionBody(transactionId, changesetId); const isCosmos = isCosmosEndpoint(this.url); prepateTransactionPipeline(this.interceptClient.pipeline, bodyParts, changesetId, isCosmos); return { transactionId, changesetId, partitionKey, pendingOperations, bodyParts, }; } /** * Adds a createEntity operation to the transaction * @param entity - Entity to create */ createEntity(entity) { this.checkPartitionKey(entity.partitionKey); this.resetableState.pendingOperations.push(this.interceptClient.createEntity(entity)); } /** * Adds a createEntity operation to the transaction per each entity in the entities array * @param entities - Array of entities to create */ createEntities(entities) { for (const entity of entities) { this.checkPartitionKey(entity.partitionKey); this.resetableState.pendingOperations.push(this.interceptClient.createEntity(entity)); } } /** * Adds a deleteEntity operation to the transaction * @param partitionKey - Partition key of the entity to delete * @param rowKey - Row key of the entity to delete * @param options - Options for the delete operation */ deleteEntity(partitionKey, rowKey, options) { this.checkPartitionKey(partitionKey); this.resetableState.pendingOperations.push(this.interceptClient.deleteEntity(partitionKey, rowKey, options)); } /** * Adds an updateEntity operation to the transaction * @param entity - Entity to update * @param mode - Update mode (Merge or Replace) * @param options - Options for the update operation */ updateEntity(entity, mode, options) { this.checkPartitionKey(entity.partitionKey); this.resetableState.pendingOperations.push(this.interceptClient.updateEntity(entity, mode, options)); } /** * Adds an upsertEntity operation to the transaction * @param entity - The properties for the table entity. * @param mode - The different modes for updating the entity: * - Merge: Updates an entity by updating the entity's properties without replacing the existing entity. * - Replace: Updates an existing entity by replacing the entire entity. * @param options - The options parameters. */ upsertEntity(entity, mode, options) { this.checkPartitionKey(entity.partitionKey); this.resetableState.pendingOperations.push(this.interceptClient.upsertEntity(entity, mode, options)); } /** * Submits the operations in the transaction */ async submitTransaction() { await Promise.all(this.resetableState.pendingOperations); const body = getTransactionHttpRequestBody(this.resetableState.bodyParts, this.resetableState.transactionId, this.resetableState.changesetId); const options = this.clientOptions; if (isTokenCredential(this.credential)) { options.credentialScopes = STORAGE_SCOPE; options.credential = this.credential; } const client = new ServiceClient(options); const headers = getTransactionHeaders(this.resetableState.transactionId); const { span, updatedOptions } = createSpan("TableTransaction-submitTransaction", {}); const request = createPipelineRequest({ url: this.url, method: "POST", body, headers: createHttpHeaders(headers), tracingOptions: updatedOptions.tracingOptions, allowInsecureConnection: this.allowInsecureConnection, }); if (isNamedKeyCredential(this.credential)) { const authHeader = getAuthorizationHeader(request, this.credential); request.headers.set("Authorization", authHeader); } else if (isSASCredential(this.credential)) { signURLWithSAS(request, this.credential); } try { const rawTransactionResponse = await client.sendRequest(request); return parseTransactionResponse(rawTransactionResponse); } catch (error) { span.setStatus({ code: SpanStatusCode.ERROR, message: error.message, }); throw error; } finally { span.end(); } } checkPartitionKey(partitionKey) { if (this.resetableState.partitionKey !== partitionKey) { throw new Error("All operations in a transaction must target the same partitionKey"); } } getUrlWithSlash() { return this.url.endsWith("/") ? this.url : `${this.url}/`; } } export function parseTransactionResponse(transactionResponse) { const subResponsePrefix = `--changesetresponse_`; const status = transactionResponse.status; const rawBody = transactionResponse.bodyAsText || ""; const splitBody = rawBody.split(subResponsePrefix); const isSuccessByStatus = 200 <= status && status < 300; if (!isSuccessByStatus) { handleBodyError(rawBody, status, transactionResponse.request, transactionResponse); } // Dropping the first and last elements as they are the boundaries // we just care about sub request content const subResponses = splitBody.slice(1, splitBody.length - 1); const responses = subResponses.map((subResponse) => { const statusMatch = subResponse.match(/HTTP\/1.1 ([0-9]*)/); if ((statusMatch === null || statusMatch === void 0 ? void 0 : statusMatch.length) !== 2) { throw new Error(`Couldn't extract status from sub-response:\n ${subResponse}`); } const subResponseStatus = Number.parseInt(statusMatch[1]); if (!Number.isInteger(subResponseStatus)) { throw new Error(`Expected sub-response status to be an integer ${subResponseStatus}`); } const bodyMatch = subResponse.match(/\{(.*)\}/); if ((bodyMatch === null || bodyMatch === void 0 ? void 0 : bodyMatch.length) === 2) { handleBodyError(bodyMatch[0], subResponseStatus, transactionResponse.request, transactionResponse); } const etagMatch = subResponse.match(/ETag: (.*)/); const rowKeyMatch = subResponse.match(/RowKey='(.*)'/); return Object.assign(Object.assign({ status: subResponseStatus }, ((rowKeyMatch === null || rowKeyMatch === void 0 ? void 0 : rowKeyMatch.length) === 2 && { rowKey: rowKeyMatch[1] })), ((etagMatch === null || etagMatch === void 0 ? void 0 : etagMatch.length) === 2 && { etag: etagMatch[1] })); }); return { status, subResponses: responses, getResponseForEntity: (rowKey) => responses.find((r) => r.rowKey === rowKey), }; } function handleBodyError(bodyAsText, statusCode, request, response) { var _a, _b; let parsedError; try { parsedError = JSON.parse(bodyAsText); } catch (_c) { parsedError = {}; } let message = "Transaction Failed"; let code; // Only transaction sub-responses return body if (parsedError && parsedError["odata.error"]) { const error = parsedError["odata.error"]; message = (_b = (_a = error.message) === null || _a === void 0 ? void 0 : _a.value) !== null && _b !== void 0 ? _b : message; code = error.code; } throw new RestError(message, { code, statusCode, request, response, }); } /** * Prepares the transaction pipeline to intercept operations * @param pipeline - Client pipeline */ export function prepateTransactionPipeline(pipeline, bodyParts, changesetId, isCosmos) { // Fist, we need to clear all the existing policies to make sure we start // with a fresh state. const policies = pipeline.getOrderedPolicies(); for (const policy of policies) { pipeline.removePolicy({ name: policy.name, }); } // With the clear state we now initialize the pipelines required for intercepting the requests. // Use transaction assemble policy to assemble request and intercept request from going to wire pipeline.addPolicy(serializationPolicy(), { phase: "Serialize" }); pipeline.addPolicy(transactionHeaderFilterPolicy()); pipeline.addPolicy(transactionRequestAssemblePolicy(bodyParts, changesetId)); if (isCosmos) { pipeline.addPolicy(cosmosPatchPolicy(), { afterPolicies: [transactionHeaderFilterPolicyName], beforePolicies: [serializationPolicyName, transactionRequestAssemblePolicyName], }); } } //# sourceMappingURL=TableTransaction.js.map