MySQL 数据库适配器

在协同编辑场景中,数据的实时同步与持久化是系统稳定性的核心。SpreadJS 作为一款前端 Excel 表格控件,通过协同服务,可以支持多人同时在线编辑表格。然而,仅有内存级的同步并不足以满足企业级应用需求:如果服务器重启、内存溢出或用户需要回溯历史操作,数据极有可能丢失或无法查询。

发布于 2025/08/19 16:04

SpreadJS

在协同编辑场景中,数据的实时同步与持久化是系统稳定性的核心。SpreadJS 作为一款前端 Excel 表格控件,通过协同服务,可以支持多人同时在线编辑表格。然而,仅有内存级的同步并不足以满足企业级应用需求:如果服务器重启、内存溢出或用户需要回溯历史操作,数据极有可能丢失或无法查询。

为了解决这一问题,SpreadJS 在服务器端提供了 数据库适配器(IDatabaseAdapter) 机制。数据库适配器负责将协同编辑产生的 文档快照(snapshot) 和 操作流(operations,简称 op) 与持久化存储系统对接。这样,系统不仅能够保存文档的最新状态,还能完整追溯历史操作,支持版本回滚和数据分析。

目前官方内置了 MemoryDb、Postgres、Sqlite3 等适配器,但在实际业务系统中,MySQL 依旧是最常见、最稳定的企业级数据库之一。因此,本文将介绍如何基于 SpreadJS 的数据库适配器接口,实现 MySQL 适配器,并将其集成到 DocumentServices 中,从而满足以下典型业务场景:

  • 数据安全与持久化:文档快照和操作日志实时写入 MySQL,避免因服务器故障导致数据丢失。

  • 版本与历史查询:通过操作记录,可以回放或还原文档到任意历史时间点,满足审计和合规需求。

  • 横向扩展与系统集成:依托 MySQL 的事务、索引和集群能力,协同服务可以更好地与现有企业系统集成。

通过 MySQL 数据库适配的支持,企业能够在 SpreadJS 协同服务中获得更高的 稳定性、可扩展性和业务适配性,从而真正将前端在线表格能力与后端数据管理体系打通。

具体实现如下:

  • 参考建表SQL,包含了里程碑数据库

CREATE TABLE IF NOT EXISTS documents(
    id VARCHAR(255) PRIMARY KEY,
    type VARCHAR(255) NOT NULL,
    version INT NOT NULL,
    snapshot_version INT NOT NULL
);
CREATE TABLE IF NOT EXISTS operations(
    doc_id VARCHAR(255) NOT NULL,
    version INT NOT NULL,
    operation TEXT NOT NULL,
    PRIMARY KEY(doc_id, version),
    FOREIGN KEY(doc_id) REFERENCES documents(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS snapshot_fragments(
    doc_id VARCHAR(255) NOT NULL,
    fragment_id VARCHAR(255) NOT NULL,
    data TEXT NOT NULL,
    PRIMARY KEY(doc_id, fragment_id),
    FOREIGN KEY(doc_id) REFERENCES documents(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS milestone_snapshot(
    doc_id VARCHAR(255) NOT NULL,
    version INT NOT NULL,
    snapshot TEXT NOT NULL,
    PRIMARY KEY(doc_id, version),
FOREIGN KEY(doc_id) REFERENCES documents(id) ON DELETE CASCADE
);

列名并不限制规则,在后面适配器实现的时候做好对应即可。

如果有其他信息需要存储,可以新建表,在DocumentServices中使用中间件或者钩子函数来写其他信息。

  • 适配器参考实现,关于mySql数据库连接和SQL执行方式可以按照项目需求自定义修改。

export class MySQLDb extends Db {

constructor(pool) {
    super();
    this.pool = pool;
}

async getDocument(docId) {
    console.log("Fetching document:", docId);
    const connection = await this.pool.getConnection();
    const [rows] = await connection.execute(
        'SELECT * FROM documents WHERE id = ?',
        [docId]
    );
    console.log("Fetched document rows:", rows);
    connection.release();
    if (0 === rows.length) return;

    const row = rows[0];
    return {
        id: row.id,
        type: row.type,
        version: row.version,
        snapshotVersion: row.snapshot_version
    };
}
async getSnapshot(docId) {
    console.log("Fetching snapshot for document:", docId);
    const connection = await this.pool.getConnection();
    const [rows] = await connection.execute(
        'SELECT * FROM documents WHERE id = ?',
        [docId]
    );
    connection.release();
    console.log("Fetched snapshot rows:", rows);
    if (0 === rows.length) return;

    const row = rows[0];
    const fragments = await this.getFragments(docId);
    return {
        id: row.id,
        v: row.snapshot_version,
        type: row.type,
        fragments: fragments
    };
}

async getFragments(docId) {
    console.log("Fetching fragments for document:", docId);
    const connection = await this.pool.getConnection();
    const [rows] = await connection.execute(
        'SELECT fragment_id, data FROM snapshot_fragments WHERE doc_id = ?',
        [docId]
    );
    console.log("Fetched fragments rows:", rows);
    connection.release();
    const fragments = {};
    for (const row of rows) {
        fragments[row.fragment_id] = JSON.parse(row.data);
    }
    return fragments;
}

async getFragment(docId, fragmentId) {
    console.log("Fetching fragment:", docId, fragmentId);
    const connection = await this.pool.getConnection();
    const [rows] = await connection.execute(
        'SELECT data FROM snapshot_fragments WHERE doc_id = ? AND fragment_id = ?',
        [docId, fragmentId]
    );
    connection.release();
    console.log("Fetched fragment rows:", rows);
    if (0 === rows.length) return;
    return JSON.parse(rows[0].data);
}

async getOps(docId, from, to) {
    console.log("Fetching operations for document:", docId, "from version:", from, "to version:", to);
    const connection = await this.pool.getConnection();
    const [rows] = await connection.execute(
        'SELECT operation FROM operations WHERE doc_id = ? AND version >= ?' + (to ? ' AND version <= ?' : '') + " ORDER BY version",
        to ? [docId, from, to] : [docId, from]
    );
    connection.release();
    console.log("Fetched operations rows:", rows);
    if (0 === rows.length) return [];
    return rows.map(row => JSON.parse(row.operation));
}

async commitOp(docId, op, document) {
    console.log("Committing operation:", docId, op, document);
    const connection = await this.pool.getConnection();
    try {
        connection.beginTransaction();

        // query version from document db
        const [rows] = await connection.execute(
            'SELECT version FROM documents WHERE id = ?',
            [docId]
        );
        if (op.create) {
            console.log("Creating new document:", docId, rows);
            if (rows.length > 0) {
                await connection.rollback();
                return false;
            }
            await connection.execute(
                'INSERT INTO documents (id, type, version, snapshot_version) VALUES (?, ?, ?, ?)',
                [docId, document.type, document.version, document.snapshotVersion]
            );
            await connection.execute(
                'INSERT INTO operations (doc_id, version, operation) VALUES (?, ?, ?)',
                [docId, op.v, JSON.stringify(op)]
            );

            await connection.commit();
            console.log("Operation create successfully.");
            return true;
        }
        else if (op.del) {
            console.log("Deleting document:", docId, rows);
            if (rows.length === 0) {
                await connection.rollback();
                return false;
            }
            await connection.execute(
                'DELETE FROM documents WHERE id = ?',
                [docId]
            );
            await connection.commit();
            console.log("Operation delete successfully.");
            return true;
        }
        else {
            console.log("Updating operation:", docId, op, rows);
            if (rows.length === 0 || rows[0].version !== op.v) {
                await connection.rollback();
                return false;
            }
            await connection.execute(
                'INSERT INTO operations (doc_id, version, operation) VALUES (?, ?, ?)',
                [docId, op.v, JSON.stringify(op)]
            );
            await connection.execute(
                'UPDATE documents SET version = ? WHERE id = ?',
                [document.version, docId]
            );
            await connection.commit();
            console.log("Operation update successfully.");
            return true;

        }

    } catch (error) {
        console.error('Error committing operation:', error);
        await connection.rollback();
        return false;
    }
    finally {
        connection.release();
    }

}

async commitSnapshot(docId, snapshot) {
    console.log("Committing snapshot for document:", docId, snapshot);
    const connection = await this.pool.getConnection();
    try {
        connection.beginTransaction();

        // query snapshot_version from document db
        const [rows] = await connection.execute(
            'SELECT snapshot_version FROM documents WHERE id = ?',
            [docId]
        );
        if (0 === rows.length) {
            await connection.rollback();
            return false;
        }
        const currentSnapshotVersion = rows[0].snapshot_version;
        if (snapshot.fromVersion !== currentSnapshotVersion || snapshot.v <= currentSnapshotVersion) {
            await connection.rollback();
            return false;
        }

        await connection.execute(
            'UPDATE documents SET snapshot_version = ? WHERE id = ?',
            [snapshot.v, docId]
        );

        if (snapshot.fragmentsChanges.deleteSnapshot) {
            await connection.execute(
                'DELETE FROM snapshot_fragments WHERE doc_id = ?',
                [docId]
            );
        } else {
            const { createFragments, updateFragments, deleteFragments } = snapshot.fragmentsChanges;
            console.log("Committing snapshot fragments changes:", createFragments, updateFragments, deleteFragments);
            if (createFragments) {
                for (const [fragmentId, data] of Object.entries(createFragments)) {
                    await connection.execute(
                        'INSERT INTO snapshot_fragments (doc_id, fragment_id, data) VALUES (?, ?, ?)',
                        [docId, fragmentId, JSON.stringify(data)]
                    );
                }
            }
            if (updateFragments) {
                for (const [fragmentId, data] of Object.entries(updateFragments)) {
                    await connection.execute(
                        'UPDATE snapshot_fragments SET data = ? WHERE doc_id = ? AND fragment_id = ?',
                        [JSON.stringify(data), docId, fragmentId]
                    );
                }
            }
            if (deleteFragments) {
                for (const fragmentId of deleteFragments) {
                    await connection.execute(
                        'DELETE FROM snapshot_fragments WHERE doc_id = ? AND fragment_id = ?',
                        [docId, fragmentId]
                    );
                }
            }
        }

        await connection.commit();
        return true;

    } catch (error) {
        console.error('Error committing snapshot:', error);
        await connection.rollback();
        return false;
    } finally {
        connection.release();
    }
}

async close() {
    await this.pool.end();
}

}
  • 里程碑数据库适配参考实现

export class MySQLMilestoneDb {

    constructor(pool, interval) {
        this.pool = pool;
        if(interval){
            this.interval = interval; // Interval in seconds for saving milestones
        }
    }

    async saveMilestoneSnapshot(snapshot) {
        console.log("Saving milestone snapshot:", snapshot);
        const connection = await this.pool.getConnection();
        try {
            await connection.beginTransaction();
            await connection.execute(
                'INSERT INTO milestone_snapshot (doc_id, version, snapshot) VALUES (?, ?, ?)',
                [snapshot.id, snapshot.v, JSON.stringify(snapshot)]
            );
            await connection.commit();
            console.log("Milestone snapshot saved successfully.");
            return true;
        } catch (error) {
            console.error('Error saving milestone snapshot:', error);
            await connection.rollback();
            return false;
        } finally {
            connection.release();
        }
    }

    async getMilestoneSnapshot(id, version) {
        console.log("Fetching milestone snapshot for document:", id, "version:", version);
        const connection = await this.pool.getConnection();
        const [rows] = await connection.execute(
            'SELECT * FROM milestone_snapshot WHERE doc_id = ? AND version <= ? ORDER BY version DESC LIMIT 1',
            [id, version]
        );
        connection.release();
        console.log("Fetched milestone snapshot rows:", rows);
        if (0 === rows.length) return;

        const row = rows[0];
        return JSON.parse(row.snapshot);
    }
}
  • 在DocumentServices中配置db和milestoneDb

const mySqlAdapter = new MySQLDb(mySqlPool);
const MySQLMilestoneAdapter = new MySQLMilestoneDb(mySqlPool, 100);
const documentServices = new OT.DocumentServices({ 
    db: mySqlAdapter, 
    milestoneDb: MySQLMilestoneAdapter
 });
server.useFeature(OT.documentFeature(documentServices));

以上便是SpreadJS 协同服务MySQL数据库适配实现。

SpreadJS | 下载试用

纯前端表格控件SpreadJS,兼容 450 种以上的 Excel 公式,具备“高性能、跨平台、与 Excel 高度兼容”的产品特性,备受华为、苏宁易购、天弘基金等行业龙头企业的青睐,并被中国软件行业协会认定为“中国优秀软件产品”。SpreadJS 可为用户提供类 Excel 的功能,满足表格文档协同编辑、 数据填报、 类 Excel 报表设计等业务场景需求,极大的降低企业研发成本和项目交付风险。

如下资源列表,可以为您评估产品提供帮助:

相关产品
推荐相关案例
推荐相关资源
关注微信
葡萄城社区二维码

关注“葡萄城社区”

加微信获取技术资讯

加微信获取技术资讯

想了解更多信息,请联系我们, 随时掌握技术资源和产品动态