summaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'examples/redis-unstable/src/socket.c')
-rw-r--r--examples/redis-unstable/src/socket.c468
1 files changed, 0 insertions, 468 deletions
diff --git a/examples/redis-unstable/src/socket.c b/examples/redis-unstable/src/socket.c
deleted file mode 100644
index 226b414..0000000
--- a/examples/redis-unstable/src/socket.c
+++ /dev/null
@@ -1,468 +0,0 @@
-/*
- * Copyright (c) 2019-Present, Redis Ltd.
- * All rights reserved.
- *
- * Licensed under your choice of (a) the Redis Source Available License 2.0
- * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
- * GNU Affero General Public License v3 (AGPLv3).
- */
-
-#include "server.h"
-#include "connhelpers.h"
-
-/* The connections module provides a lean abstraction of network connections
- * to avoid direct socket and async event management across the Redis code base.
- *
- * It does NOT provide advanced connection features commonly found in similar
- * libraries such as complete in/out buffer management, throttling, etc. These
- * functions remain in networking.c.
- *
- * The primary goal is to allow transparent handling of TCP and TLS based
- * connections. To do so, connections have the following properties:
- *
- * 1. A connection may live before its corresponding socket exists. This
- * allows various context and configuration setting to be handled before
- * establishing the actual connection.
- * 2. The caller may register/unregister logical read/write handlers to be
- * called when the connection has data to read from/can accept writes.
- * These logical handlers may or may not correspond to actual AE events,
- * depending on the implementation (for TCP they are; for TLS they aren't).
- */
-
-static ConnectionType CT_Socket;
-
-/* When a connection is created we must know its type already, but the
- * underlying socket may or may not exist:
- *
- * - For accepted connections, it exists as we do not model the listen/accept
- * part; So caller calls connCreateSocket() followed by connAccept().
- * - For outgoing connections, the socket is created by the connection module
- * itself; So caller calls connCreateSocket() followed by connConnect(),
- * which registers a connect callback that fires on connected/error state
- * (and after any transport level handshake was done).
- *
- * NOTE: An earlier version relied on connections being part of other structs
- * and not independently allocated. This could lead to further optimizations
- * like using container_of(), etc. However it was discontinued in favor of
- * this approach for these reasons:
- *
- * 1. In some cases conns are created/handled outside the context of the
- * containing struct, in which case it gets a bit awkward to copy them.
- * 2. Future implementations may wish to allocate arbitrary data for the
- * connection.
- * 3. The container_of() approach is anyway risky because connections may
- * be embedded in different structs, not just client.
- */
-
-static connection *connCreateSocket(struct aeEventLoop *el) {
- connection *conn = zcalloc(sizeof(connection));
- conn->type = &CT_Socket;
- conn->fd = -1;
- conn->iovcnt = IOV_MAX;
- conn->el = el;
-
- return conn;
-}
-
-/* Create a new socket-type connection that is already associated with
- * an accepted connection.
- *
- * The socket is not ready for I/O until connAccept() was called and
- * invoked the connection-level accept handler.
- *
- * Callers should use connGetState() and verify the created connection
- * is not in an error state (which is not possible for a socket connection,
- * but could but possible with other protocols).
- */
-static connection *connCreateAcceptedSocket(struct aeEventLoop *el, int fd, void *priv) {
- UNUSED(priv);
- connection *conn = connCreateSocket(el);
- conn->fd = fd;
- conn->state = CONN_STATE_ACCEPTING;
- return conn;
-}
-
-static int connSocketConnect(connection *conn, const char *addr, int port, const char *src_addr,
- ConnectionCallbackFunc connect_handler) {
- int fd = anetTcpNonBlockBestEffortBindConnect(NULL,addr,port,src_addr);
- if (fd == -1) {
- conn->state = CONN_STATE_ERROR;
- conn->last_errno = errno;
- return C_ERR;
- }
-
- conn->fd = fd;
- conn->state = CONN_STATE_CONNECTING;
-
- conn->conn_handler = connect_handler;
- aeCreateFileEvent(conn->el, conn->fd, AE_WRITABLE,
- conn->type->ae_handler, conn);
-
- return C_OK;
-}
-
-/* ------ Pure socket connections ------- */
-
-/* A very incomplete list of implementation-specific calls. Much of the above shall
- * move here as we implement additional connection types.
- */
-
-static void connSocketShutdown(connection *conn) {
- if (conn->fd == -1) return;
-
- shutdown(conn->fd, SHUT_RDWR);
-}
-
-/* Close the connection and free resources. */
-static void connSocketClose(connection *conn) {
- if (conn->fd != -1) {
- if (conn->el) aeDeleteFileEvent(conn->el, conn->fd, AE_READABLE | AE_WRITABLE);
- close(conn->fd);
- conn->fd = -1;
- }
-
- /* If called from within a handler, schedule the close but
- * keep the connection until the handler returns.
- */
- if (connHasRefs(conn)) {
- conn->flags |= CONN_FLAG_CLOSE_SCHEDULED;
- return;
- }
-
- zfree(conn);
-}
-
-static int connSocketWrite(connection *conn, const void *data, size_t data_len) {
- int ret = write(conn->fd, data, data_len);
- if (ret < 0 && errno != EAGAIN) {
- conn->last_errno = errno;
-
- /* Don't overwrite the state of a connection that is not already
- * connected, not to mess with handler callbacks.
- */
- if (errno != EINTR && conn->state == CONN_STATE_CONNECTED)
- conn->state = CONN_STATE_ERROR;
- }
-
- return ret;
-}
-
-static int connSocketWritev(connection *conn, const struct iovec *iov, int iovcnt) {
- int ret = writev(conn->fd, iov, iovcnt);
- if (ret < 0 && errno != EAGAIN) {
- conn->last_errno = errno;
-
- /* Don't overwrite the state of a connection that is not already
- * connected, not to mess with handler callbacks.
- */
- if (errno != EINTR && conn->state == CONN_STATE_CONNECTED)
- conn->state = CONN_STATE_ERROR;
- }
-
- return ret;
-}
-
-static int connSocketRead(connection *conn, void *buf, size_t buf_len) {
- int ret = read(conn->fd, buf, buf_len);
- if (!ret) {
- conn->state = CONN_STATE_CLOSED;
- } else if (ret < 0 && errno != EAGAIN) {
- conn->last_errno = errno;
-
- /* Don't overwrite the state of a connection that is not already
- * connected, not to mess with handler callbacks.
- */
- if (errno != EINTR && conn->state == CONN_STATE_CONNECTED)
- conn->state = CONN_STATE_ERROR;
- }
-
- return ret;
-}
-
-static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_handler) {
- int ret = C_OK;
-
- if (conn->state != CONN_STATE_ACCEPTING) return C_ERR;
- conn->state = CONN_STATE_CONNECTED;
-
- connIncrRefs(conn);
- if (!callHandler(conn, accept_handler)) ret = C_ERR;
- connDecrRefs(conn);
-
- return ret;
-}
-
-/* Rebind the connection to another event loop, read/write handlers must not
- * be installed in the current event loop, otherwise it will cause two event
- * loops to manage the same connection at the same time. */
-static int connSocketRebindEventLoop(connection *conn, aeEventLoop *el) {
- serverAssert(!conn->el && !conn->read_handler && !conn->write_handler);
- conn->el = el;
- return C_OK;
-}
-
-/* Register a write handler, to be called when the connection is writable.
- * If NULL, the existing handler is removed.
- *
- * The barrier flag indicates a write barrier is requested, resulting with
- * CONN_FLAG_WRITE_BARRIER set. This will ensure that the write handler is
- * always called before and not after the read handler in a single event
- * loop.
- */
-static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {
- if (func == conn->write_handler) return C_OK;
-
- conn->write_handler = func;
- if (barrier)
- conn->flags |= CONN_FLAG_WRITE_BARRIER;
- else
- conn->flags &= ~CONN_FLAG_WRITE_BARRIER;
- if (!conn->write_handler)
- aeDeleteFileEvent(conn->el,conn->fd,AE_WRITABLE);
- else
- if (aeCreateFileEvent(conn->el,conn->fd,AE_WRITABLE,
- conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
- return C_OK;
-}
-
-/* Register a read handler, to be called when the connection is readable.
- * If NULL, the existing handler is removed.
- */
-static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
- if (func == conn->read_handler) return C_OK;
-
- conn->read_handler = func;
- if (!conn->read_handler)
- aeDeleteFileEvent(conn->el,conn->fd,AE_READABLE);
- else
- if (aeCreateFileEvent(conn->el,conn->fd,
- AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
- return C_OK;
-}
-
-static const char *connSocketGetLastError(connection *conn) {
- return strerror(conn->last_errno);
-}
-
-static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask)
-{
- UNUSED(el);
- UNUSED(fd);
- connection *conn = clientData;
-
- if (conn->state == CONN_STATE_CONNECTING &&
- (mask & AE_WRITABLE) && conn->conn_handler) {
-
- int conn_error = anetGetError(conn->fd);
- if (conn_error) {
- conn->last_errno = conn_error;
- conn->state = CONN_STATE_ERROR;
- } else {
- conn->state = CONN_STATE_CONNECTED;
- }
-
- if (!conn->write_handler) aeDeleteFileEvent(conn->el, conn->fd, AE_WRITABLE);
-
- if (!callHandler(conn, conn->conn_handler)) return;
- conn->conn_handler = NULL;
- }
-
- /* Normally we execute the readable event first, and the writable
- * event later. This is useful as sometimes we may be able
- * to serve the reply of a query immediately after processing the
- * query.
- *
- * However if WRITE_BARRIER is set in the mask, our application is
- * asking us to do the reverse: never fire the writable event
- * after the readable. In such a case, we invert the calls.
- * This is useful when, for instance, we want to do things
- * in the beforeSleep() hook, like fsync'ing a file to disk,
- * before replying to a client. */
- int invert = conn->flags & CONN_FLAG_WRITE_BARRIER;
-
- int call_write = (mask & AE_WRITABLE) && conn->write_handler;
- int call_read = (mask & AE_READABLE) && conn->read_handler;
-
- /* Handle normal I/O flows */
- if (!invert && call_read) {
- if (!callHandler(conn, conn->read_handler)) return;
- }
- /* Fire the writable event. */
- if (call_write) {
- if (!callHandler(conn, conn->write_handler)) return;
- }
- /* If we have to invert the call, fire the readable event now
- * after the writable one. */
- if (invert && call_read) {
- if (!callHandler(conn, conn->read_handler)) return;
- }
-}
-
-static void connSocketAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
- int cport, cfd;
- int max = server.max_new_conns_per_cycle;
- char cip[NET_IP_STR_LEN];
- UNUSED(mask);
- UNUSED(privdata);
-
- while(max--) {
- cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
- if (cfd == ANET_ERR) {
- if (anetAcceptFailureNeedsRetry(errno))
- continue;
- if (errno != EWOULDBLOCK)
- serverLog(LL_WARNING,
- "Accepting client connection: %s", server.neterr);
- return;
- }
- serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
- acceptCommonHandler(connCreateAcceptedSocket(el,cfd,NULL), 0, cip);
- }
-}
-
-static int connSocketAddr(connection *conn, char *ip, size_t ip_len, int *port, int remote) {
- if (anetFdToString(conn->fd, ip, ip_len, port, remote) == 0)
- return C_OK;
-
- conn->last_errno = errno;
- return C_ERR;
-}
-
-static int connSocketIsLocal(connection *conn) {
- char cip[NET_IP_STR_LEN + 1] = { 0 };
-
- if (connSocketAddr(conn, cip, sizeof(cip) - 1, NULL, 1) == C_ERR)
- return -1;
-
- return !strncmp(cip, "127.", 4) || !strcmp(cip, "::1");
-}
-
-static int connSocketListen(connListener *listener) {
- return listenToPort(listener);
-}
-
-static int connSocketBlockingConnect(connection *conn, const char *addr, int port, long long timeout) {
- int fd = anetTcpNonBlockConnect(NULL,addr,port);
- if (fd == -1) {
- conn->state = CONN_STATE_ERROR;
- conn->last_errno = errno;
- return C_ERR;
- }
-
- if ((aeWait(fd, AE_WRITABLE, timeout) & AE_WRITABLE) == 0) {
- conn->state = CONN_STATE_ERROR;
- conn->last_errno = ETIMEDOUT;
- return C_ERR;
- }
-
- conn->fd = fd;
- conn->state = CONN_STATE_CONNECTED;
- return C_OK;
-}
-
-/* Connection-based versions of syncio.c functions.
- * NOTE: This should ideally be refactored out in favor of pure async work.
- */
-
-static ssize_t connSocketSyncWrite(connection *conn, char *ptr, ssize_t size, long long timeout) {
- return syncWrite(conn->fd, ptr, size, timeout);
-}
-
-static ssize_t connSocketSyncRead(connection *conn, char *ptr, ssize_t size, long long timeout) {
- return syncRead(conn->fd, ptr, size, timeout);
-}
-
-static ssize_t connSocketSyncReadLine(connection *conn, char *ptr, ssize_t size, long long timeout) {
- return syncReadLine(conn->fd, ptr, size, timeout);
-}
-
-static const char *connSocketGetType(connection *conn) {
- (void) conn;
-
- return CONN_TYPE_SOCKET;
-}
-
-static ConnectionType CT_Socket = {
- /* connection type */
- .get_type = connSocketGetType,
-
- /* connection type initialize & finalize & configure */
- .init = NULL,
- .cleanup = NULL,
- .configure = NULL,
-
- /* ae & accept & listen & error & address handler */
- .ae_handler = connSocketEventHandler,
- .accept_handler = connSocketAcceptHandler,
- .addr = connSocketAddr,
- .is_local = connSocketIsLocal,
- .listen = connSocketListen,
-
- /* create/shutdown/close connection */
- .conn_create = connCreateSocket,
- .conn_create_accepted = connCreateAcceptedSocket,
- .shutdown = connSocketShutdown,
- .close = connSocketClose,
-
- /* connect & accept */
- .connect = connSocketConnect,
- .blocking_connect = connSocketBlockingConnect,
- .accept = connSocketAccept,
-
- /* event loop */
- .unbind_event_loop = NULL,
- .rebind_event_loop = connSocketRebindEventLoop,
-
- /* IO */
- .write = connSocketWrite,
- .writev = connSocketWritev,
- .read = connSocketRead,
- .set_write_handler = connSocketSetWriteHandler,
- .set_read_handler = connSocketSetReadHandler,
- .get_last_error = connSocketGetLastError,
- .sync_write = connSocketSyncWrite,
- .sync_read = connSocketSyncRead,
- .sync_readline = connSocketSyncReadLine,
-
- /* pending data */
- .has_pending_data = NULL,
- .process_pending_data = NULL,
-};
-
-int connBlock(connection *conn) {
- if (conn->fd == -1) return C_ERR;
- return anetBlock(NULL, conn->fd);
-}
-
-int connNonBlock(connection *conn) {
- if (conn->fd == -1) return C_ERR;
- return anetNonBlock(NULL, conn->fd);
-}
-
-int connEnableTcpNoDelay(connection *conn) {
- if (conn->fd == -1) return C_ERR;
- return anetEnableTcpNoDelay(NULL, conn->fd);
-}
-
-int connDisableTcpNoDelay(connection *conn) {
- if (conn->fd == -1) return C_ERR;
- return anetDisableTcpNoDelay(NULL, conn->fd);
-}
-
-int connKeepAlive(connection *conn, int interval) {
- if (conn->fd == -1) return C_ERR;
- return anetKeepAlive(NULL, conn->fd, interval);
-}
-
-int connSendTimeout(connection *conn, long long ms) {
- return anetSendTimeout(NULL, conn->fd, ms);
-}
-
-int connRecvTimeout(connection *conn, long long ms) {
- return anetRecvTimeout(NULL, conn->fd, ms);
-}
-
-int RedisRegisterConnectionTypeSocket(void)
-{
- return connTypeRegister(&CT_Socket);
-}