你有没有发现:PostgreSQL可以做到Redis能做的一切

我之前用的是一套很典型的Web应用技术栈:



两个数据库,两个体系需要管理,也意味着多了两处故障风险点。


后来我意识到:PostgreSQL可以做到Redis能做的一切。


于是我彻底移除了Redis,迁移过程是这样的。


一、设置:我使用Redis的目的


在替换之前,Redis主要处理三件事:


1、缓存(使用率70%)


// Cache API responses
await redis.set(`user:${id}`, JSON.stringify(user), 'EX', 3600);


2、发布订阅(使用率20%)


// Real-time notifications
redis.publish('notifications', JSON.stringify({ userId, message }));


3、后台消息队列(使用率10%)


// Using Bull/BullMQ
queue.add('send-email', { to, subject, body });


痛点:



二、我为什么考虑替换Redis


原因一:成本


我的Redis配置:



PostgreSQL:



节省成本:每月约100美元


原因二:运行复杂性


使用 Redis:


Postgres backup ✅
Redis backup ❓ (RDB? AOF? Both?)
Postgres monitoring ✅
Redis monitoring ❓
Postgres failover ✅
Redis Sentinel/Cluster ❓


不使用Redis:


Postgres backup ✅
Postgres monitoring ✅
Postgres failover ✅


系统依赖组件更少。


原因三:数据一致性


经典问题:


// Update database
await db.query('UPDATE users SET name = $1 WHERE id = $2', [name, id]);


// Invalidate cache
await redis.del(`user:${id}`);


// ⚠️ What if Redis is down?
// ⚠️ What if this fails?
// Now cache and DB are out of sync


在PostgreSQL中,这类问题通过事务即可解决。


三、PostgreSQL特性


1、使用非日志表进行缓存


Redis:


await redis.set('session:abc123', JSON.stringify(sessionData), 'EX', 3600);


PostgreSQL:


CREATE UNLOGGED TABLE cache (
  key TEXT PRIMARY KEY,
  value JSONB NOT NULL,
  expires_at TIMESTAMPTZ NOT NULL
);


CREATE INDEX idx_cache_expires ON cache(expires_at);


插入:


INSERT INTO cache (key, value, expires_at)
VALUES ($1, $2, NOW() + INTERVAL '1 hour')
ON CONFLICT (key) DO UPDATE
  SET value = EXCLUDED.value,
      expires_at = EXCLUDED.expires_at;


读:


SELECT value FROM cache
WHERE key = $1 AND expires_at > NOW();


清理(定期运行):


DELETE FROM cache WHERE expires_at < NOW();


什么是非日志表?



表现:


Redis SET: 0.05ms
Postgres UNLOGGED INSERT: 0.08ms


用作缓存已经完全够用。


2、基于LISTEN或NOTIFY实现发布订阅功能


接下来就精彩了。


PostgreSQL具有原生的发布订阅功能,但大多数开发人员并不了解。


1)Redis的发布订阅功能


// Publisher
redis.publish('notifications', JSON.stringify({ userId: 123, msg: 'Hello' }));


// Subscriber
redis.subscribe('notifications');
redis.on('message', (channel, message) => {
  console.log(message);
});


2)PostgreSQL的发布订阅功能


-- Publisher
NOTIFY notifications, '{"userId": 123, "msg": "Hello"}';


// Subscriber (Node.js with pg)
const client = new Client({ connectionString: process.env.DATABASE_URL });
await client.connect();


await client.query('LISTEN notifications');


client.on('notification', (msg) => {
  const payload = JSON.parse(msg.payload);
  console.log(payload);
});


性能对比:


Redis pub/sub latency: 1-2ms
Postgres NOTIFY latency: 2-5ms


性能略低,但优势明显:



3)实际应用场景:实时日志追踪


在我的日志管理应用中,需要实现日志实时流式推送


使用Redis:


// When new log arrives
await db.query('INSERT INTO logs ...');
await redis.publish('logs:new', JSON.stringify(log));


// Frontend listens
redis.subscribe('logs:new');


问题:有两个操作,如果发布失败怎么办?


使用PostgreSQL:


CREATE FUNCTION notify_new_log() RETURNS TRIGGER AS $
BEGIN
  PERFORM pg_notify('logs_new', row_to_json(NEW)::text);
  RETURN NEW;
END;
$ LANGUAGE plpgsql;


CREATE TRIGGER log_inserted
AFTER INSERT ON logs
FOR EACH ROW EXECUTE FUNCTION notify_new_log();


现在整个操作是原子性的:插入数据与通知推送,要么同时生效,要么都不执行。


// Frontend (via SSE)
app.get('/logs/stream', async (req, res) => {
  const client = await pool.connect();


    res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
  });


    await client.query('LISTEN logs_new');


      client.on('notification', (msg) => {
    res.write(`data: ${msg.payload}

`);
  });
});


结果:无需Redis即可实现实时日志流传输。


3、基于SKIP LOCKED实现任务队列


Redis(使用Bull或者BullMQ):


queue.add('send-email', { to, subject, body });


queue.process('send-email', async (job) => {
  await sendEmail(job.data);
});


PostgreSQL:


CREATE TABLE jobs (
  id BIGSERIAL PRIMARY KEY,
  queue TEXT NOT NULL,
  payload JSONB NOT NULL,
  attempts INT DEFAULT 0,
  max_attempts INT DEFAULT 3,
  scheduled_at TIMESTAMPTZ DEFAULT NOW(),
  created_at TIMESTAMPTZ DEFAULT NOW()
);


CREATE INDEX idx_jobs_queue ON jobs(queue, scheduled_at) 
WHERE attempts < max_attempts;


入队:


INSERT INTO jobs (queue, payload)
VALUES ('send-email', '{"to": "user@example.com", "subject": "Hi"}');


工作进程(出队):


WITH next_job AS (
  SELECT id FROM jobs
  WHERE queue = $1
    AND attempts < max_attempts
    AND scheduled_at <= NOW()
  ORDER BY scheduled_at
  LIMIT 1
  FOR UPDATE SKIP LOCKED
)
UPDATE jobs
SET attempts = attempts + 1
FROM next_job
WHERE jobs.id = next_job.id
RETURNING *;


神奇之处:FOR UPDATE SKIP LOCKED


这让PostgreSQL成为了无锁队列



表现:


Redis BRPOP: 0.1ms
Postgres SKIP LOCKED: 0.3ms


对于大多数业务负载而言,性能差异可以忽略不计。


4、限流


Redis(经典限流方案):


const key = `ratelimit:${userId}`;
const count = await redis.incr(key);
if (count === 1) {
  await redis.expire(key, 60); // 60 seconds
}


if (count > 100) {
  throw new Error('Rate limit exceeded');
}


PostgreSQL:


CREATE TABLE rate_limits (
  user_id INT PRIMARY KEY,
  request_count INT DEFAULT 0,
  window_start TIMESTAMPTZ DEFAULT NOW()
);


-- Check and increment
WITH current AS (
  SELECT 
    request_count,
    CASE 
      WHEN window_start < NOW() - INTERVAL '1 minute'
      THEN 1  -- Reset counter
      ELSE request_count + 1
    END AS new_count
  FROM rate_limits
  WHERE user_id = $1
  FOR UPDATE
)
UPDATE rate_limits
SET 
  request_count = (SELECT new_count FROM current),
  window_start = CASE
    WHEN window_start < NOW() - INTERVAL '1 minute'
    THEN NOW()
    ELSE window_start
  END
WHERE user_id = $1
RETURNING request_count;


或者用窗口函数更简单:


CREATE TABLE api_requests (
  user_id INT NOT NULL,
  created_at TIMESTAMPTZ DEFAULT NOW()
);


-- Check rate limit
SELECT COUNT(*) FROM api_requests
WHERE user_id = $1
  AND created_at > NOW() - INTERVAL '1 minute';


  -- If under limit, insert
INSERT INTO api_requests (user_id) VALUES ($1);


-- Cleanup old requests periodically
DELETE FROM api_requests WHERE created_at < NOW() - INTERVAL '5 minutes';


Postgres的适用场景:



Redis的适用场景:



5、基于JSONB实现会话存储


Redis:


await redis.set(`session:${sessionId}`, JSON.stringify(sessionData), 'EX', 86400);


PostgreSQL:


CREATE TABLE sessions (
  id TEXT PRIMARY KEY,
  data JSONB NOT NULL,
  expires_at TIMESTAMPTZ NOT NULL
);


CREATE INDEX idx_sessions_expires ON sessions(expires_at);


-- Insert/Update
INSERT INTO sessions (id, data, expires_at)
VALUES ($1, $2, NOW() + INTERVAL '24 hours')
ON CONFLICT (id) DO UPDATE
  SET data = EXCLUDED.data,
      expires_at = EXCLUDED.expires_at;


      -- Read
SELECT data FROM sessions
WHERE id = $1 AND expires_at > NOW();


附加内容:JSONB 运算符


你可以在会话内部进行查询:


-- Find all sessions for a specific user
SELECT * FROM sessions
WHERE data->>'userId' = '123';


-- Find sessions with specific role
SELECT * FROM sessions
WHERE data->'user'->>'role' = 'admin';


你用Redis做不到这一点!


四、实际生产环境基准测试


我用生产数据集完成了基准测试:


1、测试设置



2、结果


操作

Redis

PostgreSQL

不同之处

缓存集

0.05毫秒

0.08毫秒

速度降低 60%

缓存获取

0.04毫秒

0.06毫秒

速度降低 50%

发布订阅

1.2毫秒

3.1毫秒

速度降低 158%

队列推送

0.08毫秒

0.15毫秒

速度降低 87%

队列弹出

0.12毫秒

0.31毫秒

速度降低 158%


PostgreSQL速度较慢,但是:



3、合并执行(真正的胜利)


场景:插入数据 + 缓存失效 + 通知订阅者


使用Redis


await db.query('INSERT INTO posts ...');       // 2ms
await redis.del('posts:latest');                // 1ms (network hop)
await redis.publish('posts:new', data);         // 1ms (network hop)
// Total: ~4ms


使用PostgreSQL:


BEGIN;
INSERT INTO posts ...;                          -- 2ms
DELETE FROM cache WHERE key = 'posts:latest';  -- 0.1ms (same connection)
NOTIFY posts_new, '...';                        -- 0.1ms (same connection)
COMMIT;
-- Total: ~2.2ms


当多个操作合并执行时,PostgreSQL速度更快。


五、哪些场景仍建议保留Redis


如果符合以下条件,请不要替换Redis:


1、需要极致的性能


Redis: 100,000+ ops/sec (single instance)
Postgres: 10,000-50,000 ops/sec


如果你每秒执行数百万次缓存读取操作,那就继续使用 Redis。


2、使用Redis特有的数据结构


Redis具备:



PostgreSQL 虽有对应实现,但使用起来更为繁琐:


-- Leaderboard in Postgres (slower)
SELECT user_id, score
FROM leaderboard
ORDER BY score DESC
LIMIT 10;


-- vs Redis
ZREVRANGE leaderboard 0 9 WITHSCORES


3、架构需要独立缓存层


如果你的架构要求独立的缓存层(例如微服务架构),建议保留Redis。


六、迁移方案


不要一夜之间就彻底放弃Redis,以下是我的做法:


第一阶段:并排共存(第1周)


// Write to both
await redis.set(key, value);
await pg.query('INSERT INTO cache ...');


// Read from Redis (still primary)
let data = await redis.get(key);


监控:对比命中率、延迟。


第二阶段:从Postgres读取数据(第2周)


// Try Postgres first
let data = await pg.query('SELECT value FROM cache WHERE key = $1', [key]);


// Fallback to Redis
if (!data) {
  data = await redis.get(key);
}


监控:错误率、性能。


第三阶段:仅写入Postgres(第3周)


// Only write to Postgres
await pg.query('INSERT INTO cache ...');


监控:所有功能是否正常运行?


第四阶段:移除Redis(第4周)


# Turn off Redis
# Watch for errors
# Nothing breaks? Success!


七、代码示例:完整实现


1、缓存模块(PostgreSQL)


// cache.js
class PostgresCache {
  constructor(pool) {
    this.pool = pool;
  }


    async get(key) {
    const result = await this.pool.query(
      'SELECT value FROM cache WHERE key = $1 AND expires_at > NOW()',
      [key]
    );
    return result.rows[0]?.value;
  }


    async set(key, value, ttlSeconds = 3600) {
    await this.pool.query(
      `INSERT INTO cache (key, value, expires_at)
       VALUES ($1, $2, NOW() + INTERVAL '${ttlSeconds} seconds')
       ON CONFLICT (key) DO UPDATE
         SET value = EXCLUDED.value,
             expires_at = EXCLUDED.expires_at`,
      [key, value]
    );
  }


    async delete(key) {
    await this.pool.query('DELETE FROM cache WHERE key = $1', [key]);
  }


    async cleanup() {
    await this.pool.query('DELETE FROM cache WHERE expires_at < NOW()');
  }
}


module.exports = PostgresCache;


2、发布订阅模块


// pubsub.js
class PostgresPubSub {
  constructor(pool) {
    this.pool = pool;
    this.listeners = new Map();
  }


    async publish(channel, message) {
    const payload = JSON.stringify(message);
    await this.pool.query('SELECT pg_notify($1, $2)', [channel, payload]);
  }


    async subscribe(channel, callback) {
    const client = await this.pool.connect();
    await client.query(`LISTEN ${channel}`);
    client.on('notification', (msg) => {
      if (msg.channel === channel) {
        callback(JSON.parse(msg.payload));
      }
    });


        this.listeners.set(channel, client);
  }


    async unsubscribe(channel) {
    const client = this.listeners.get(channel);
    if (client) {
      await client.query(`UNLISTEN ${channel}`);
      client.release();
      this.listeners.delete(channel);
    }
  }
}


module.exports = PostgresPubSub;


3、任务队列模块


// queue.js
class PostgresQueue {
  constructor(pool) {
    this.pool = pool;
  }


    async enqueue(queue, payload, scheduledAt = new Date()) {
    await this.pool.query(
      'INSERT INTO jobs (queue, payload, scheduled_at) VALUES ($1, $2, $3)',
      [queue, payload, scheduledAt]
    );
  }


    async dequeue(queue) {
    const result = await this.pool.query(
      `WITH next_job AS (
        SELECT id FROM jobs
        WHERE queue = $1
          AND attempts < max_attempts
          AND scheduled_at <= NOW()
        ORDER BY scheduled_at
        LIMIT 1
        FOR UPDATE SKIP LOCKED
      )
      UPDATE jobs
      SET attempts = attempts + 1
      FROM next_job
      WHERE jobs.id = next_job.id
      RETURNING jobs.*`,
      [queue]
    );


        return result.rows[0];
  }


    async complete(jobId) {
    await this.pool.query('DELETE FROM jobs WHERE id = $1', [jobId]);
  }


    async fail(jobId, error) {
    await this.pool.query(
      `UPDATE jobs
       SET attempts = max_attempts,
           payload = payload || jsonb_build_object('error', $2)
       WHERE id = $1`,
      [jobId, error.message]
    );
  }
}


module.exports = PostgresQueue;


八、性能优化技巧


1、使用连接池


const { Pool } = require('pg');


const pool = new Pool({
  max: 20,  // Max connections
  idleTimeoutMillis: 30000,
  connectionTimeoutMillis: 2000,
});


2、添加合适的索引


CREATE INDEX CONCURRENTLY idx_cache_key ON cache(key) WHERE expires_at > NOW();
CREATE INDEX CONCURRENTLY idx_jobs_pending ON jobs(queue, scheduled_at) 
  WHERE attempts < max_attempts;


3、调整PostgreSQL配置


# postgresql.conf
shared_buffers = 2GB           # 25% of RAM
effective_cache_size = 6GB     # 75% of RAM
work_mem = 50MB                # For complex queries
maintenance_work_mem = 512MB   # For VACUUM


4、定期维护


-- Run daily
VACUUM ANALYZE cache;
VACUUM ANALYZE jobs;


-- Or enable autovacuum (recommended)
ALTER TABLE cache SET (autovacuum_vacuum_scale_factor = 0.1);


九、三个月后的结果


我省下了:



我失去了:



我会再次这样做吗?就这个业务场景而言:会。


是否推荐所有人都这么做?不推荐。


十、决策矩阵


如果满足以下条件,可用Postgres替换Redis:



以下场景建议保留Redis:



十一、参考资料


1、PostgreSQL 特性



2、工具



3、其他解决方案



十二、最后


我用PostgreSQL替换了Redis的这些场景:



结果:



适合这样做的场景:



不适合这样做的场景:



你是否用过Postgres替换Redis(或反过来用Redis替换Postgres)?实际体验如何?欢迎在评论区分享你的基准测试数据!


作者丨Polliog 编译丨dbaplus社群

来源丨网址:
https://dev.to/polliog/i-replaced-redis-with-postgresql-and-its-faster-4942

dbaplus社群欢迎广大技术人员投稿,投稿邮箱:editor@dbaplus.cn


活动推荐

5月22日,2026 XCOPS 智能运维管理人年会「广州站」重磅来袭!聚焦大模型迭代、AI Agent 深度应用等技术热点,邀请一众行业领军人物、技术大咖,从技术架构、实战案例到科研成果,与大家一起探索AI应用于智能运维与数据库的最佳方式,共同破解垂类智能体落地、多Agent协同、数据库自治技术工程化、核心系统信创与智能化平衡等现实难题。

复制(点击)链接即可报名:XCOPS智能运维管理人年会-广州站

展开阅读全文

更新时间:2026-05-21

标签:科技   发现   缓存   队列   操作   场景   性能   速度   美元   数据   架构   数据结构

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight All Rights Reserved.
Powered By 61893.com 闽ICP备11008920号
闽公网安备35020302034903号

Top